This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new df5234b [ISSUE #116] replicator adapt to the new connect api (#117)
df5234b is described below
commit df5234b7fae0d80aee01fce03b4528b82ab4a70a
Author: Slideee <[email protected]>
AuthorDate: Wed May 11 16:12:24 2022 +0800
[ISSUE #116] replicator adapt to the new connect api (#117)
* [ISSUE #116] replicator adapt to the new connect api
* [ISSUE #116] Code optimization, throwing exceptions, distinguishing
between init and validate methods
* [ISSUE #116] use maxTasks parameter to control task parallelism
* [ISSUE #116] update topic divide logic
* [ISSUE #116] update divide logic, add divide testcase
Co-authored-by: yechun <[email protected]>
---
connectors/rocketmq-replicator/pom.xml | 2 +-
.../apache/rocketmq/replicator/MetaSourceTask.java | 72 +++----
.../rocketmq/replicator/RmqMetaReplicator.java | 40 ++--
.../rocketmq/replicator/RmqSourceReplicator.java | 43 ++--
.../apache/rocketmq/replicator/RmqSourceTask.java | 102 +++++-----
.../apache/rocketmq/replicator/common/Utils.java | 25 ++-
.../rocketmq/replicator/config/ConfigDefine.java | 2 -
.../replicator/config/RmqConnectorConfig.java | 9 +-
.../replicator/config/TaskDivideConfig.java | 13 +-
.../rocketmq/replicator/schema/SchemaEnum.java | 6 +
.../strategy/DivideTaskByConsistentHash.java | 8 +-
.../replicator/strategy/DivideTaskByQueue.java | 9 +-
.../replicator/strategy/DivideTaskByTopic.java | 6 +-
.../replicator/strategy/TaskDivideStrategy.java | 2 +-
.../replicator/DefaultTaskDivideStrategyTest.java | 217 +++++++++++++++++++++
.../replicator/RmqSourceReplicatorTest.java | 2 +-
16 files changed, 395 insertions(+), 163 deletions(-)
diff --git a/connectors/rocketmq-replicator/pom.xml
b/connectors/rocketmq-replicator/pom.xml
index 43d3559..36df030 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -74,7 +74,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
- <version>0.1.1</version>
+ <version>0.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 67fc89f..35d4714 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -19,17 +19,14 @@ package org.apache.rocketmq.replicator;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -43,6 +40,7 @@ import org.apache.rocketmq.replicator.config.ConfigUtil;
import org.apache.rocketmq.replicator.config.TaskConfig;
import org.apache.rocketmq.replicator.offset.OffsetSyncStore;
import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.replicator.schema.SchemaEnum;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,8 +62,18 @@ public class MetaSourceTask extends SourceTask {
}
@Override
- public void start(KeyValue config) {
+ public void validate(KeyValue config) {
+
+ }
+
+ @Override
+ public void init(KeyValue config) {
ConfigUtil.load(config, this.config);
+ }
+
+ @Override
+ public void start(SourceTaskContext sourceTaskContext) {
+ super.start(sourceTaskContext);
try {
this.srcMQAdminExt = Utils.startMQAdminTool(this.config);
@@ -78,22 +86,25 @@ public class MetaSourceTask extends SourceTask {
this.started = true;
}
- @Override public void stop() {
+ @Override
+ public void stop() {
if (started) {
started = false;
}
srcMQAdminExt.shutdown();
}
- @Override public void pause() {
+ @Override
+ public void pause() {
}
- @Override public void resume() {
+ @Override
+ public void resume() {
}
- @Override public Collection<SourceDataEntry> poll() {
+ @Override public List<ConnectRecord> poll() {
log.debug("polling...");
List<String> groups =
JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
@@ -106,7 +117,7 @@ public class MetaSourceTask extends SourceTask {
}
return Collections.emptyList();
}
- List<SourceDataEntry> res = new ArrayList<>();
+ List<ConnectRecord> res = new ArrayList<>();
for (String group : groups) {
ConsumeStats stats;
try {
@@ -117,38 +128,19 @@ public class MetaSourceTask extends SourceTask {
}
for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable :
stats.getOffsetTable().entrySet()) {
-
MessageQueue mq = offsetTable.getKey();
long srcOffset = offsetTable.getValue().getConsumerOffset();
long targetOffset = this.store.convertTargetOffset(mq, group,
srcOffset);
+ List<Field> fields = new ArrayList<Field>();
+ Schema schema = new Schema(SchemaEnum.OFFSET.name(),
FieldType.INT64, fields);
+ schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(),
SchemaBuilder.string().build()));
+
JSONObject jsonObject = new JSONObject();
- jsonObject.put(RmqConstants.NEXT_POSITION, srcOffset);
-
- Schema schema = new Schema();
- schema.setDataSource(this.config.getSourceRocketmq());
- schema.setName(mq.getTopic());
- schema.setFields(new ArrayList<>());
- schema.getFields().add(new Field(0,
- FieldName.OFFSET.getKey(), FieldType.INT64));
-
- DataEntryBuilder dataEntryBuilder = new
DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(this.config.getStoreTopic())
- .entryType(EntryType.UPDATE);
- dataEntryBuilder.putFiled(FieldName.OFFSET.getKey(),
targetOffset);
-
- SourceDataEntry sourceDataEntry =
dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(RmqConstants.getPartition(
- mq.getTopic(),
- mq.getBrokerName(),
-
String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)),
-
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
- );
- String targetTopic = new
StringBuilder().append(group).append("-").append(mq.getTopic())
- .append("-").append(mq.getQueueId()).toString();
- sourceDataEntry.setQueueName(targetTopic);
- res.add(sourceDataEntry);
+ jsonObject.put(FieldName.OFFSET.getKey(), targetOffset);
+ ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(mq.getTopic(), mq.getBrokerName(),
String.valueOf(mq.getQueueId())),
+ Utils.offsetValue(srcOffset), System.currentTimeMillis(),
schema, jsonObject.toJSONString());
+ res.add(connectRecord);
}
}
return res;
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 787f536..e401b28 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -17,8 +17,9 @@
package org.apache.rocketmq.replicator;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -45,6 +46,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
@@ -94,20 +96,32 @@ public class RmqMetaReplicator extends SourceConnector {
executor = Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
}
- @Override public String verifyAndSetConfig(KeyValue config) {
- log.info("verifyAndSetConfig...");
+ @Override
+ public void validate(KeyValue config) {
+ // Check they need key.
+ for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ log.error("RmqMetaReplicator check need key error , request
config key: " + requestKey);
+ throw new RuntimeException("RmqMetaReplicator check need key
error.");
+ }
+ }
+ }
+
+ @Override
+ public void init(KeyValue config) {
try {
- replicatorConfig.validate(config);
+ replicatorConfig.init(config);
} catch (IllegalArgumentException e) {
- return e.getMessage();
+ log.error("RmqMetaReplicator validate config error.", e);
+ throw new IllegalArgumentException("RmqMetaReplicator validate
config error.");
}
- this.prepare();
this.configValid = true;
- return "";
+ this.prepare();
}
@Override
- public void start() {
+ public void start(ConnectorContext componentContext) {
+ super.start(componentContext);
log.info("starting...");
executor.scheduleAtFixedRate(this::refreshConsumerGroups,
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(),
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(this::syncSubConfig,
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(),
TimeUnit.SECONDS);
@@ -133,7 +147,7 @@ public class RmqMetaReplicator extends SourceConnector {
}
@Override
- public List<KeyValue> taskConfigs() {
+ public List<KeyValue> taskConfigs(int maxTasks) {
log.debug("preparing taskConfig...");
if (!configValid) {
return new ArrayList<>();
@@ -146,7 +160,7 @@ public class RmqMetaReplicator extends SourceConnector {
e.printStackTrace();
}
- return Utils.groupPartitions(new ArrayList<>(this.knownGroups),
this.replicatorConfig.getTaskParallelism(), replicatorConfig);
+ return Utils.groupPartitions(new ArrayList<>(this.knownGroups),
replicatorConfig, maxTasks);
}
private void prepare() {
@@ -183,7 +197,7 @@ public class RmqMetaReplicator extends SourceConnector {
if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
log.info("reconfig consumer groups, new Groups: {} , dead
groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
knownGroups = groups;
- context.requestTaskReconfiguration();
+ connectorContext.requestTaskReconfiguration();
}
} catch (Exception e) {
log.error("refresh consumer groups failed.", e);
@@ -213,7 +227,7 @@ public class RmqMetaReplicator extends SourceConnector {
}
private void ensureSubConfig(Collection<String> targetBrokers,
- SubscriptionGroupConfig subConfig) throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
+ SubscriptionGroupConfig subConfig) throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
for (String addr : targetBrokers) {
this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr,
subConfig);
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 75adc7c..e4966d1 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -17,8 +17,9 @@
package org.apache.rocketmq.replicator;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -89,26 +90,19 @@ public class RmqSourceReplicator extends SourceConnector {
}
@Override
- public String verifyAndSetConfig(KeyValue config) {
-
- // Check the need key.
+ public void validate(KeyValue config) {
+ // Check they need key.
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
- return "Request config key: " + requestKey;
+ log.error("RmqSourceReplicator check need key error , request
config key: " + requestKey);
+ throw new RuntimeException("RmqSourceReplicator check need key
error.");
}
}
-
- try {
- this.replicatorConfig.validate(config);
- } catch (IllegalArgumentException e) {
- return e.getMessage();
- }
- this.configValid = true;
- return "";
}
@Override
- public void start() {
+ public void start(ConnectorContext componentContext) {
+ super.start(componentContext);
try {
startMQAdminTools();
} catch (MQClientException e) {
@@ -126,7 +120,6 @@ public class RmqSourceReplicator extends SourceConnector {
boolean first = true;
Map<String, Set<TaskTopicInfo>> origin = null;
-
@Override public void run() {
buildRoute();
@@ -135,7 +128,7 @@ public class RmqSourceReplicator extends SourceConnector {
first = false;
}
if (!compare(origin, topicRouteMap)) {
- context.requestTaskReconfiguration();
+ connectorContext.requestTaskReconfiguration();
origin = new HashMap<>(topicRouteMap);
}
}
@@ -164,6 +157,17 @@ public class RmqSourceReplicator extends SourceConnector {
return true;
}
+ @Override
+ public void init(KeyValue config) {
+ try {
+ this.replicatorConfig.init(config);
+ } catch (IllegalArgumentException e) {
+ log.error("RmqSourceReplicator init config error.", e);
+ throw new IllegalArgumentException("RmqSourceReplicator init
config error.");
+ }
+ this.configValid = true;
+ }
+
@Override
public void stop() {
executor.shutdown();
@@ -188,7 +192,7 @@ public class RmqSourceReplicator extends SourceConnector {
}
@Override
- public List<KeyValue> taskConfigs() {
+ public List<KeyValue> taskConfigs(int maxTasks) {
if (!configValid) {
return new ArrayList<KeyValue>();
}
@@ -208,12 +212,11 @@ public class RmqSourceReplicator extends SourceConnector {
this.replicatorConfig.getStoreTopic(),
this.replicatorConfig.getConverter(),
DataType.COMMON_MESSAGE.ordinal(),
- this.replicatorConfig.getTaskParallelism(),
this.replicatorConfig.isSrcAclEnable(),
this.replicatorConfig.getSrcAccessKey(),
this.replicatorConfig.getSrcSecretKey()
);
- return
this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc);
+ return
this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc,
maxTasks);
}
public void buildRoute() {
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index ca7edb4..73cebfd 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,17 +19,16 @@ package org.apache.rocketmq.replicator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.PositionStorageReader;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,12 +47,11 @@ import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.TaskConfig;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.replicator.schema.SchemaEnum;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
public class RmqSourceTask extends SourceTask {
private static final Logger log =
LoggerFactory.getLogger(RmqSourceTask.class);
@@ -74,7 +72,7 @@ public class RmqSourceTask extends SourceTask {
}
@Override
- public Collection<SourceDataEntry> poll() {
+ public List<ConnectRecord> poll() {
if (this.config.getDataType() == DataType.COMMON_MESSAGE.ordinal()) {
return pollCommonMessage();
@@ -88,8 +86,8 @@ public class RmqSourceTask extends SourceTask {
}
@Override
- public void start(KeyValue config) {
- ConfigUtil.load(config, this.config);
+ public void start(SourceTaskContext sourceTaskContext) {
+ super.start(sourceTaskContext);
RPCHook rpcHook = null;
if (this.config.isSrcAclEnable()) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(this.config.getSrcAccessKey(),
this.config.getSrcSecretKey()));
@@ -117,8 +115,8 @@ public class RmqSourceTask extends SourceTask {
}
}
}
- PositionStorageReader positionStorageReader =
this.context.positionStorageReader();
- mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter,
positionStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
+ OffsetStorageReader offsetStorageReader =
this.sourceTaskContext.offsetStorageReader();
+ mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter,
offsetStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
started = true;
} catch (Exception e) {
log.error("Consumer of task {} start failed.", this.taskId, e);
@@ -127,6 +125,16 @@ public class RmqSourceTask extends SourceTask {
log.info("RocketMQ source task started");
}
+ @Override
+ public void validate(KeyValue config) {
+
+ }
+
+ @Override
+ public void init(KeyValue config) {
+ ConfigUtil.load(config, this.config);
+ }
+
@Override
public void stop() {
@@ -148,9 +156,9 @@ public class RmqSourceTask extends SourceTask {
}
- private Collection<SourceDataEntry> pollCommonMessage() {
+ private List<ConnectRecord> pollCommonMessage() {
- List<SourceDataEntry> res = new ArrayList<>();
+ List<ConnectRecord> res = new ArrayList<>();
if (started) {
try {
for (TaskTopicInfo taskTopicConfig :
this.mqOffsetMap.keySet()) {
@@ -159,32 +167,17 @@ public class RmqSourceTask extends SourceTask {
switch (pullResult.getPullStatus()) {
case FOUND: {
this.mqOffsetMap.put(taskTopicConfig,
pullResult.getNextBeginOffset());
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(RmqConstants.NEXT_POSITION,
pullResult.getNextBeginOffset());
List<MessageExt> msgs =
pullResult.getMsgFoundList();
- Schema schema = new Schema();
-
schema.setDataSource(this.config.getSourceRocketmq());
- schema.setName(taskTopicConfig.getTopic());
- schema.setFields(new ArrayList<>());
- schema.getFields().add(new Field(0,
- FieldName.COMMON_MESSAGE.getKey(),
FieldType.STRING));
-
+ List<Field> fields = new ArrayList<>();
+ Schema schema = new
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
+ schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
for (MessageExt msg : msgs) {
- DataEntryBuilder dataEntryBuilder = new
DataEntryBuilder(schema);
-
dataEntryBuilder.timestamp(System.currentTimeMillis())
-
.queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
-
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new
String(msg.getBody()));
- SourceDataEntry sourceDataEntry =
dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(RmqConstants.getPartition(
- taskTopicConfig.getTopic(),
- taskTopicConfig.getBrokerName(),
-
String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
-
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
- );
-
sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
- res.add(sourceDataEntry);
+ JSONObject jsonObject = new JSONObject();
+
jsonObject.put(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
+ ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(taskTopicConfig.getTopic(),
taskTopicConfig.getBrokerName(), String.valueOf(msg.getQueueId())),
+
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(),
schema, jsonObject.toJSONString());
+ res.add(connectRecord);
}
-
break;
}
default:
@@ -202,21 +195,21 @@ public class RmqSourceTask extends SourceTask {
return res;
}
- private Collection<SourceDataEntry> pollTopicConfig() {
+ private List<ConnectRecord> pollTopicConfig() {
DefaultMQAdminExt srcMQAdminExt;
return new ArrayList<>();
}
- private Collection<SourceDataEntry> pollBrokerConfig() {
+ private List<ConnectRecord> pollBrokerConfig() {
return new ArrayList<>();
}
- private Collection<SourceDataEntry> pollSubConfig() {
+ private List<ConnectRecord> pollSubConfig() {
return new ArrayList<>();
}
public Map<TaskTopicInfo, Long>
getPositionMapWithCheck(List<TaskTopicInfo> taskList,
- PositionStorageReader positionStorageReader, long timeout, TimeUnit
unit) {
+ OffsetStorageReader positionStorageReader, long timeout, TimeUnit
unit) {
unit = unit == null ? TimeUnit.MILLISECONDS : unit;
Map<TaskTopicInfo, Long> positionMap = getPositionMap(taskList,
positionStorageReader);
@@ -243,25 +236,23 @@ public class RmqSourceTask extends SourceTask {
}
waitTime = msecs - (System.currentTimeMillis() - startTime);
- } while (!waitPositionReady && waitTime > 0L);
+ }
+ while (!waitPositionReady && waitTime > 0L);
return positionMap;
}
public Map<TaskTopicInfo, Long> getPositionMap(List<TaskTopicInfo>
taskList,
- PositionStorageReader positionStorageReader) {
+ OffsetStorageReader offsetStorageReader) {
Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
for (TaskTopicInfo tti : taskList) {
- ByteBuffer positionInfo = positionStorageReader.getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(
- tti.getTopic(),
- tti.getBrokerName(),
-
String.valueOf(tti.getQueueId())).getBytes(StandardCharsets.UTF_8)));
-
- if (null != positionInfo && positionInfo.array().length > 0) {
- String positionJson = new String(positionInfo.array(),
StandardCharsets.UTF_8);
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- positionMap.put(tti,
jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ RecordOffset positionInfo =
offsetStorageReader.readOffset(Utils.offsetKey(tti.getTopic(),
tti.getBrokerName(),
+ String.valueOf(tti.getQueueId())));
+ if (positionInfo != null && null != positionInfo.getOffset()) {
+ Map<String, ?> offset = positionInfo.getOffset();
+ Object lastRecordedOffset =
offset.get(RmqConstants.NEXT_POSITION);
+ long skipLeft =
Long.parseLong(String.valueOf(lastRecordedOffset));
+ positionMap.put(tti, skipLeft);
} else {
positionMap.put(tti, 0L);
}
@@ -269,5 +260,6 @@ public class RmqSourceTask extends SourceTask {
return positionMap;
}
+
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 1be3d54..ac2efc0 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -18,10 +18,14 @@ package org.apache.rocketmq.replicator.common;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.RmqConstants;
import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.replicator.config.TaskConfig;
@@ -117,10 +122,12 @@ public class Utils {
}
}
- public static List<KeyValue> groupPartitions(List<String> elements, int
numGroups, RmqConnectorConfig tdc) {
- if (numGroups <= 0)
+ public static List<KeyValue> groupPartitions(List<String> elements,
RmqConnectorConfig tdc,
+ int maxTasks) {
+ if (maxTasks <= 0)
throw new IllegalArgumentException("Number of groups must be
positive.");
+ int numGroups = Math.min(elements.size(), maxTasks);
List<KeyValue> result = new ArrayList<>(numGroups);
// Each group has either n+1 or n raw partitions
@@ -200,4 +207,18 @@ public class Utils {
return targetMQAdminExt;
}
+
+ public static RecordPartition offsetKey(String topic, String broker,
String queueId) {
+ Map<String, String> map = new HashMap<>();
+ map.put(RmqConstants.TOPIC_NAME, topic);
+ map.put(RmqConstants.BROKER_NAME, broker);
+ map.put(RmqConstants.QUEUE_ID, queueId);
+ return new RecordPartition(map);
+ }
+
+ public static RecordOffset offsetValue(Long pos) {
+ Map<String, String> map = new HashMap<>();
+ map.put(RmqConstants.NEXT_POSITION, String.valueOf(pos));
+ return new RecordOffset(map);
+ }
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index d3d5205..fea2cfb 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -45,8 +45,6 @@ public class ConfigDefine {
public static final String CONN_SOURCE_RECORD_CONVERTER =
"source-record-converter";
- public static final String CONN_TASK_PARALLELISM = "task-parallelism";
-
public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
public static final String REFRESH_INTERVAL = "refresh.interval";
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
index b88bd69..40edee0 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
@@ -25,8 +25,6 @@ import
org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
public class RmqConnectorConfig {
-
- private int taskParallelism;
private Set<String> whiteList;
private String srcNamesrvs;
private String targetNamesrvs;
@@ -48,8 +46,7 @@ public class RmqConnectorConfig {
public RmqConnectorConfig() {
}
- public void validate(KeyValue config) {
- this.taskParallelism =
config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);
+ public void init(KeyValue config) {
int strategy = config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY,
DivideStrategyEnum.BY_QUEUE.ordinal());
if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
@@ -98,10 +95,6 @@ public class RmqConnectorConfig {
}
}
- public int getTaskParallelism() {
- return this.taskParallelism;
- }
-
public Set<String> getWhiteList() {
return this.whiteList;
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index 097255b..16a74ed 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -28,8 +28,6 @@ public class TaskDivideConfig {
private int dataType;
- private int taskParallelism;
-
private boolean srcAclEnable = false;
private String srcAccessKey;
@@ -37,13 +35,12 @@ public class TaskDivideConfig {
private String srcSecretKey;
public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster,
String storeTopic, String srcRecordConverter,
- int dataType, int taskParallelism, boolean srcAclEnable, String
srcAccessKey, String srcSecretKey) {
+ int dataType, boolean srcAclEnable, String srcAccessKey, String
srcSecretKey) {
this.sourceNamesrvAddr = sourceNamesrvAddr;
this.srcCluster = srcCluster;
this.storeTopic = storeTopic;
this.srcRecordConverter = srcRecordConverter;
this.dataType = dataType;
- this.taskParallelism = taskParallelism;
this.srcAclEnable = srcAclEnable;
this.srcAccessKey = srcAccessKey;
this.srcSecretKey = srcSecretKey;
@@ -89,14 +86,6 @@ public class TaskDivideConfig {
this.dataType = dataType;
}
- public int getTaskParallelism() {
- return taskParallelism;
- }
-
- public void setTaskParallelism(int taskParallelism) {
- this.taskParallelism = taskParallelism;
- }
-
public boolean isSrcAclEnable() {
return srcAclEnable;
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
new file mode 100644
index 0000000..b137742
--- /dev/null
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.replicator.schema;
+
+public enum SchemaEnum {
+ MESSAGE,
+ OFFSET,
+}
\ No newline at end of file
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
index 7c2e27c..b5529aa 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
@@ -33,15 +33,15 @@ import
org.apache.rocketmq.replicator.config.TaskDivideConfig;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public class DivideTaskByConsistentHash extends TaskDivideStrategy {
- @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicMap, TaskDivideConfig tdc) {
+ @Override
+ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap,
TaskDivideConfig tdc, int maxTasks) {
List<KeyValue> config = new ArrayList<>();
- int parallelism = tdc.getTaskParallelism();
Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
int id = -1;
Collection<ClientNode> cidNodes = new ArrayList<>();
- for (int i = 0; i < parallelism; i++) {
+ for (int i = 0; i < maxTasks; i++) {
cidNodes.add(new ClientNode(i, Integer.toString(i)));
queueTopicList.put(i, new ArrayList<>());
}
@@ -57,7 +57,7 @@ public class DivideTaskByConsistentHash extends
TaskDivideStrategy {
}
}
- for (int i = 0; i < parallelism; i++) {
+ for (int i = 0; i < maxTasks; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(),
tdc.getStoreTopic());
keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(),
tdc.getSourceNamesrvAddr());
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index bbfa580..db2a03b 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -31,10 +31,15 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public class DivideTaskByQueue extends TaskDivideStrategy {
- @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicRouteMap, TaskDivideConfig tdc) {
+ @Override
+ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicRouteMap, TaskDivideConfig tdc, int maxTasks) {
List<KeyValue> config = new ArrayList<KeyValue>();
- int parallelism = tdc.getTaskParallelism();
+ int queueNum = 0;
+ for (String t : topicRouteMap.keySet()) {
+ queueNum += topicRouteMap.get(t).size();
+ }
+ int parallelism = Math.min(queueNum, maxTasks);
Map<Integer, List<TaskTopicInfo>> queueTopicList = new
HashMap<Integer, List<TaskTopicInfo>>();
int id = -1;
for (String t : topicRouteMap.keySet()) {
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index 0d13a5e..010cb90 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -28,10 +28,12 @@ import java.util.Map;
public class DivideTaskByTopic extends TaskDivideStrategy {
- @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicRouteMap, TaskDivideConfig tdc) {
+ @Override
+ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicRouteMap, TaskDivideConfig tdc,
+ int maxTasks) {
List<KeyValue> config = new ArrayList<KeyValue>();
- int parallelism = tdc.getTaskParallelism();
+ int parallelism = Math.min(topicRouteMap.keySet().size(), maxTasks);
int id = -1;
Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer,
List<TaskTopicInfo>>();
for (Map.Entry<String, Set<TaskTopicInfo>> entry :
topicRouteMap.entrySet()) {
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 89ed060..778dbee 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -25,5 +25,5 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public abstract class TaskDivideStrategy {
- public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicMap, TaskDivideConfig tdc);
+ public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>>
topicMap, TaskDivideConfig tdc, int maxTasks);
}
diff --git
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
new file mode 100644
index 0000000..60dfe0e
--- /dev/null
+++
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
@@ -0,0 +1,217 @@
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByConsistentHash;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultTaskDivideStrategyTest {
+
+ @Test
+ public void testDivideTaskByTopic() {
+ RmqConnectorConfig config = new RmqConnectorConfig();
+ KeyValue kv = new DefaultKeyValue();
+ kv.put(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, "0");
+ config.init(kv);
+ TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1",
"source_cluster", 0, "T_TEST1");
+ TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST2",
"source_cluster", 0, "T_TEST2");
+ TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST3",
"source_cluster", 0, "T_TEST3");
+ TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST4",
"source_cluster", 0, "T_TEST4");
+ TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST5",
"source_cluster", 0, "T_TEST5");
+ Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo1);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo2);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue3 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo3);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue4 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo4);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue5 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo5);
+ }
+ };
+ Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String,
Set<TaskTopicInfo>>() {
+ {
+ put("T_TEST1", msgQueue1);
+ put("T_TEST2", msgQueue2);
+ put("T_TEST3", msgQueue3);
+ put("T_TEST4", msgQueue4);
+ put("T_TEST5", msgQueue5);
+
+ }
+ };
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ "127.0.0.1:9876",
+ "cluster_source",
+ "store_topic",
+ "",
+ DataType.COMMON_MESSAGE.ordinal(),
+ false,
+ "",
+ ""
+ );
+ List<KeyValue> taskConfigs =
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
+ assertThat(taskConfigs.size()).isEqualTo(4);
+
+ List<KeyValue> taskConfigs1 =
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 6);
+ assertThat(taskConfigs1.size()).isEqualTo(5);
+ }
+
+ @Test
+ public void testDivideTaskByQueue() {
+ RmqConnectorConfig config = new RmqConnectorConfig();
+ KeyValue kv = new DefaultKeyValue();
+ kv.put(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, "1");
+ config.init(kv);
+ TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1",
"source_cluster", 0, "T_TEST1");
+ TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST1",
"source_cluster", 1, "T_TEST1");
+ TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST1",
"source_cluster", 2, "T_TEST1");
+ TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST2",
"source_cluster", 0, "T_TEST2");
+ TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST2",
"source_cluster", 1, "T_TEST2");
+ Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo1);
+ add(taskTopicInfo2);
+ add(taskTopicInfo3);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo4);
+ add(taskTopicInfo5);
+ }
+ };
+ Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String,
Set<TaskTopicInfo>>() {
+ {
+ put("T_TEST1", msgQueue1);
+ put("T_TEST2", msgQueue2);
+ }
+ };
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ "127.0.0.1:9876",
+ "cluster_source",
+ "store_topic",
+ "",
+ DataType.COMMON_MESSAGE.ordinal(),
+ false,
+ "",
+ ""
+ );
+ List<KeyValue> taskConfigs =
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
+ assertThat(taskConfigs.size()).isEqualTo(4);
+
+ List<KeyValue> taskConfigs1 =
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 6);
+ assertThat(taskConfigs1.size()).isEqualTo(5);
+ }
+
+ @Test
+ public void testDivideTaskByHash() {
+ TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1",
"source_cluster", 0, "T_TEST1");
+ TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST2",
"source_cluster", 0, "T_TEST2");
+ TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST3",
"source_cluster", 0, "T_TEST3");
+ TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST4",
"source_cluster", 0, "T_TEST4");
+ TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST5",
"source_cluster", 0, "T_TEST5");
+ Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo1);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo2);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue3 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo3);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue4 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo4);
+ }
+ };
+ Set<TaskTopicInfo> msgQueue5 = new HashSet<TaskTopicInfo>() {
+ {
+ add(taskTopicInfo5);
+ }
+ };
+ Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String,
Set<TaskTopicInfo>>() {
+ {
+ put("T_TEST1", msgQueue1);
+ put("T_TEST2", msgQueue2);
+ put("T_TEST3", msgQueue3);
+ put("T_TEST4", msgQueue4);
+ put("T_TEST5", msgQueue5);
+
+ }
+ };
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ "127.0.0.1:9876",
+ "cluster_source",
+ "store_topic",
+ "",
+ DataType.COMMON_MESSAGE.ordinal(),
+ false,
+ "",
+ ""
+ );
+ DivideTaskByConsistentHash hash = new DivideTaskByConsistentHash();
+ List<KeyValue> taskConfigs = hash.divide(topicRouteMap, tdc, 3);
+ assertThat(taskConfigs.size()).isEqualTo(3);
+
+ List<KeyValue> taskConfigs1 = hash.divide(topicRouteMap, tdc, 6);
+ assertThat(taskConfigs1.size()).isEqualTo(6);
+ }
+
+ @Test
+ public void testDivideTaskByGroup() {
+ RmqConnectorConfig config = new RmqConnectorConfig();
+ KeyValue kv = new DefaultKeyValue();
+ config.init(kv);
+ List<String> knownGroups = new ArrayList<String>() {
+ {
+ add("G_TEST1");
+ add("G_TEST2");
+ add("G_TEST3");
+ add("G_TEST4");
+ add("G_TEST5");
+ }
+ };
+ List<KeyValue> taskConfigs = Utils.groupPartitions(new
ArrayList<>(knownGroups), config, 6);
+ assertThat(taskConfigs.size()).isEqualTo(5);
+
+ List<KeyValue> taskConfigs1 = Utils.groupPartitions(new
ArrayList<>(knownGroups), config, 4);
+ assertThat(taskConfigs1.size()).isEqualTo(4);
+ }
+}
diff --git
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index 795c386..b8fc0a3 100644
---
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -56,7 +56,7 @@ public class RmqSourceReplicatorTest {
RmqConnectorConfig config = new RmqConnectorConfig();
KeyValue kv = new DefaultKeyValue();
kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
- config.validate(kv);
+ config.init(kv);
Field field =
RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
FieldSetter.setField(rmqSourceReplicator, field, config);