This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new df5234b  [ISSUE #116] replicator adapt to the new connect api (#117)
df5234b is described below

commit df5234b7fae0d80aee01fce03b4528b82ab4a70a
Author: Slideee <[email protected]>
AuthorDate: Wed May 11 16:12:24 2022 +0800

    [ISSUE #116] replicator adapt to the new connect api (#117)
    
    * [ISSUE #116] replicator adapt to the new connect api
    
    * [ISSUE #116] Code optimization, throwing exceptions, distinguishing 
between init and validate methods
    
    * [ISSUE #116] use maxTasks parameter to control task parallelism
    
    * [ISSUE #116] update topic divide logic
    
    * [ISSUE #116] update divide logic, add divide testcase
    
    Co-authored-by: yechun <[email protected]>
---
 connectors/rocketmq-replicator/pom.xml             |   2 +-
 .../apache/rocketmq/replicator/MetaSourceTask.java |  72 +++----
 .../rocketmq/replicator/RmqMetaReplicator.java     |  40 ++--
 .../rocketmq/replicator/RmqSourceReplicator.java   |  43 ++--
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 102 +++++-----
 .../apache/rocketmq/replicator/common/Utils.java   |  25 ++-
 .../rocketmq/replicator/config/ConfigDefine.java   |   2 -
 .../replicator/config/RmqConnectorConfig.java      |   9 +-
 .../replicator/config/TaskDivideConfig.java        |  13 +-
 .../rocketmq/replicator/schema/SchemaEnum.java     |   6 +
 .../strategy/DivideTaskByConsistentHash.java       |   8 +-
 .../replicator/strategy/DivideTaskByQueue.java     |   9 +-
 .../replicator/strategy/DivideTaskByTopic.java     |   6 +-
 .../replicator/strategy/TaskDivideStrategy.java    |   2 +-
 .../replicator/DefaultTaskDivideStrategyTest.java  | 217 +++++++++++++++++++++
 .../replicator/RmqSourceReplicatorTest.java        |   2 +-
 16 files changed, 395 insertions(+), 163 deletions(-)

diff --git a/connectors/rocketmq-replicator/pom.xml 
b/connectors/rocketmq-replicator/pom.xml
index 43d3559..36df030 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -74,7 +74,7 @@
         <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.1</version>
+            <version>0.1.2</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 67fc89f..35d4714 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -19,17 +19,14 @@ package org.apache.rocketmq.replicator;
 
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -43,6 +40,7 @@ import org.apache.rocketmq.replicator.config.ConfigUtil;
 import org.apache.rocketmq.replicator.config.TaskConfig;
 import org.apache.rocketmq.replicator.offset.OffsetSyncStore;
 import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.replicator.schema.SchemaEnum;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +62,18 @@ public class MetaSourceTask extends SourceTask {
     }
 
     @Override
-    public void start(KeyValue config) {
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
         ConfigUtil.load(config, this.config);
+    }
+
+    @Override
+    public void start(SourceTaskContext sourceTaskContext) {
+        super.start(sourceTaskContext);
 
         try {
             this.srcMQAdminExt = Utils.startMQAdminTool(this.config);
@@ -78,22 +86,25 @@ public class MetaSourceTask extends SourceTask {
         this.started = true;
     }
 
-    @Override public void stop() {
+    @Override
+    public void stop() {
         if (started) {
             started = false;
         }
         srcMQAdminExt.shutdown();
     }
 
-    @Override public void pause() {
+    @Override
+    public void pause() {
 
     }
 
-    @Override public void resume() {
+    @Override
+    public void resume() {
 
     }
 
-    @Override public Collection<SourceDataEntry> poll() {
+    @Override public List<ConnectRecord> poll() {
         log.debug("polling...");
         List<String> groups = 
JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
 
@@ -106,7 +117,7 @@ public class MetaSourceTask extends SourceTask {
             }
             return Collections.emptyList();
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+        List<ConnectRecord> res = new ArrayList<>();
         for (String group : groups) {
             ConsumeStats stats;
             try {
@@ -117,38 +128,19 @@ public class MetaSourceTask extends SourceTask {
             }
 
             for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : 
stats.getOffsetTable().entrySet()) {
-
                 MessageQueue mq = offsetTable.getKey();
                 long srcOffset = offsetTable.getValue().getConsumerOffset();
                 long targetOffset = this.store.convertTargetOffset(mq, group, 
srcOffset);
 
+                List<Field> fields = new ArrayList<Field>();
+                Schema schema = new Schema(SchemaEnum.OFFSET.name(), 
FieldType.INT64, fields);
+                schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), 
SchemaBuilder.string().build()));
+
                 JSONObject jsonObject = new JSONObject();
-                jsonObject.put(RmqConstants.NEXT_POSITION, srcOffset);
-
-                Schema schema = new Schema();
-                schema.setDataSource(this.config.getSourceRocketmq());
-                schema.setName(mq.getTopic());
-                schema.setFields(new ArrayList<>());
-                schema.getFields().add(new Field(0,
-                    FieldName.OFFSET.getKey(), FieldType.INT64));
-
-                DataEntryBuilder dataEntryBuilder = new 
DataEntryBuilder(schema);
-                dataEntryBuilder.timestamp(System.currentTimeMillis())
-                    .queue(this.config.getStoreTopic())
-                    .entryType(EntryType.UPDATE);
-                dataEntryBuilder.putFiled(FieldName.OFFSET.getKey(), 
targetOffset);
-
-                SourceDataEntry sourceDataEntry = 
dataEntryBuilder.buildSourceDataEntry(
-                    ByteBuffer.wrap(RmqConstants.getPartition(
-                        mq.getTopic(),
-                        mq.getBrokerName(),
-                        
String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)),
-                    
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
-                );
-                String targetTopic = new 
StringBuilder().append(group).append("-").append(mq.getTopic())
-                    .append("-").append(mq.getQueueId()).toString();
-                sourceDataEntry.setQueueName(targetTopic);
-                res.add(sourceDataEntry);
+                jsonObject.put(FieldName.OFFSET.getKey(), targetOffset);
+                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(mq.getTopic(), mq.getBrokerName(), 
String.valueOf(mq.getQueueId())),
+                    Utils.offsetValue(srcOffset), System.currentTimeMillis(), 
schema, jsonObject.toJSONString());
+                res.add(connectRecord);
             }
         }
         return res;
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 787f536..e401b28 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -17,8 +17,9 @@
 package org.apache.rocketmq.replicator;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,6 +46,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
 import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
@@ -94,20 +96,32 @@ public class RmqMetaReplicator extends SourceConnector {
         executor = Executors.newSingleThreadScheduledExecutor(new 
BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
-        log.info("verifyAndSetConfig...");
+    @Override
+    public void validate(KeyValue config) {
+        // Check they need key.
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                log.error("RmqMetaReplicator check need key error , request 
config key: " + requestKey);
+                throw new RuntimeException("RmqMetaReplicator check need key 
error.");
+            }
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
         try {
-            replicatorConfig.validate(config);
+            replicatorConfig.init(config);
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            log.error("RmqMetaReplicator validate config error.", e);
+            throw new IllegalArgumentException("RmqMetaReplicator validate 
config error.");
         }
-        this.prepare();
         this.configValid = true;
-        return "";
+        this.prepare();
     }
 
     @Override
-    public void start() {
+    public void start(ConnectorContext componentContext) {
+        super.start(componentContext);
         log.info("starting...");
         executor.scheduleAtFixedRate(this::refreshConsumerGroups, 
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), 
TimeUnit.SECONDS);
         executor.scheduleAtFixedRate(this::syncSubConfig, 
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), 
TimeUnit.SECONDS);
@@ -133,7 +147,7 @@ public class RmqMetaReplicator extends SourceConnector {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         log.debug("preparing taskConfig...");
         if (!configValid) {
             return new ArrayList<>();
@@ -146,7 +160,7 @@ public class RmqMetaReplicator extends SourceConnector {
             e.printStackTrace();
         }
 
-        return Utils.groupPartitions(new ArrayList<>(this.knownGroups), 
this.replicatorConfig.getTaskParallelism(), replicatorConfig);
+        return Utils.groupPartitions(new ArrayList<>(this.knownGroups), 
replicatorConfig, maxTasks);
     }
 
     private void prepare() {
@@ -183,7 +197,7 @@ public class RmqMetaReplicator extends SourceConnector {
             if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
                 log.info("reconfig consumer groups, new Groups: {} , dead 
groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
                 knownGroups = groups;
-                context.requestTaskReconfiguration();
+                connectorContext.requestTaskReconfiguration();
             }
         } catch (Exception e) {
             log.error("refresh consumer groups failed.", e);
@@ -213,7 +227,7 @@ public class RmqMetaReplicator extends SourceConnector {
     }
 
     private void ensureSubConfig(Collection<String> targetBrokers,
-            SubscriptionGroupConfig subConfig) throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
+        SubscriptionGroupConfig subConfig) throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
         for (String addr : targetBrokers) {
             this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subConfig);
         }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 75adc7c..e4966d1 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -17,8 +17,9 @@
 package org.apache.rocketmq.replicator;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -89,26 +90,19 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
-
-        // Check the need key.
+    public void validate(KeyValue config) {
+        // Check they need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                log.error("RmqSourceReplicator check need key error , request 
config key: " + requestKey);
+                throw new RuntimeException("RmqSourceReplicator check need key 
error.");
             }
         }
-
-        try {
-            this.replicatorConfig.validate(config);
-        } catch (IllegalArgumentException e) {
-            return e.getMessage();
-        }
-        this.configValid = true;
-        return "";
     }
 
     @Override
-    public void start() {
+    public void start(ConnectorContext componentContext) {
+        super.start(componentContext);
         try {
             startMQAdminTools();
         } catch (MQClientException e) {
@@ -126,7 +120,6 @@ public class RmqSourceReplicator extends SourceConnector {
             boolean first = true;
             Map<String, Set<TaskTopicInfo>> origin = null;
 
-
             @Override public void run() {
 
                 buildRoute();
@@ -135,7 +128,7 @@ public class RmqSourceReplicator extends SourceConnector {
                     first = false;
                 }
                 if (!compare(origin, topicRouteMap)) {
-                    context.requestTaskReconfiguration();
+                    connectorContext.requestTaskReconfiguration();
                     origin = new HashMap<>(topicRouteMap);
                 }
             }
@@ -164,6 +157,17 @@ public class RmqSourceReplicator extends SourceConnector {
         return true;
     }
 
+    @Override
+    public void init(KeyValue config) {
+        try {
+            this.replicatorConfig.init(config);
+        } catch (IllegalArgumentException e) {
+            log.error("RmqSourceReplicator init config error.", e);
+            throw new IllegalArgumentException("RmqSourceReplicator init 
config error.");
+        }
+        this.configValid = true;
+    }
+
     @Override
     public void stop() {
         executor.shutdown();
@@ -188,7 +192,7 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {
             return new ArrayList<KeyValue>();
         }
@@ -208,12 +212,11 @@ public class RmqSourceReplicator extends SourceConnector {
             this.replicatorConfig.getStoreTopic(),
             this.replicatorConfig.getConverter(),
             DataType.COMMON_MESSAGE.ordinal(),
-            this.replicatorConfig.getTaskParallelism(),
             this.replicatorConfig.isSrcAclEnable(),
             this.replicatorConfig.getSrcAccessKey(),
             this.replicatorConfig.getSrcSecretKey()
         );
-        return 
this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc);
+        return 
this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc, 
maxTasks);
     }
 
     public void buildRoute() {
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index ca7edb4..73cebfd 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,17 +19,16 @@ package org.apache.rocketmq.replicator;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.PositionStorageReader;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,12 +47,11 @@ import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskConfig;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.replicator.schema.SchemaEnum;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-
 public class RmqSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(RmqSourceTask.class);
@@ -74,7 +72,7 @@ public class RmqSourceTask extends SourceTask {
     }
 
     @Override
-    public Collection<SourceDataEntry> poll() {
+    public List<ConnectRecord> poll() {
 
         if (this.config.getDataType() == DataType.COMMON_MESSAGE.ordinal()) {
             return pollCommonMessage();
@@ -88,8 +86,8 @@ public class RmqSourceTask extends SourceTask {
     }
 
     @Override
-    public void start(KeyValue config) {
-        ConfigUtil.load(config, this.config);
+    public void start(SourceTaskContext sourceTaskContext) {
+        super.start(sourceTaskContext);
         RPCHook rpcHook = null;
         if (this.config.isSrcAclEnable()) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(this.config.getSrcAccessKey(), 
this.config.getSrcSecretKey()));
@@ -117,8 +115,8 @@ public class RmqSourceTask extends SourceTask {
                     }
                 }
             }
-            PositionStorageReader positionStorageReader = 
this.context.positionStorageReader();
-            mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter, 
positionStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
+            OffsetStorageReader offsetStorageReader = 
this.sourceTaskContext.offsetStorageReader();
+            mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter, 
offsetStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
             started = true;
         } catch (Exception e) {
             log.error("Consumer of task {} start failed.", this.taskId, e);
@@ -127,6 +125,16 @@ public class RmqSourceTask extends SourceTask {
         log.info("RocketMQ source task started");
     }
 
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        ConfigUtil.load(config, this.config);
+    }
+
     @Override
     public void stop() {
 
@@ -148,9 +156,9 @@ public class RmqSourceTask extends SourceTask {
 
     }
 
-    private Collection<SourceDataEntry> pollCommonMessage() {
+    private List<ConnectRecord> pollCommonMessage() {
 
-        List<SourceDataEntry> res = new ArrayList<>();
+        List<ConnectRecord> res = new ArrayList<>();
         if (started) {
             try {
                 for (TaskTopicInfo taskTopicConfig : 
this.mqOffsetMap.keySet()) {
@@ -159,32 +167,17 @@ public class RmqSourceTask extends SourceTask {
                     switch (pullResult.getPullStatus()) {
                         case FOUND: {
                             this.mqOffsetMap.put(taskTopicConfig, 
pullResult.getNextBeginOffset());
-                            JSONObject jsonObject = new JSONObject();
-                            jsonObject.put(RmqConstants.NEXT_POSITION, 
pullResult.getNextBeginOffset());
                             List<MessageExt> msgs = 
pullResult.getMsgFoundList();
-                            Schema schema = new Schema();
-                            
schema.setDataSource(this.config.getSourceRocketmq());
-                            schema.setName(taskTopicConfig.getTopic());
-                            schema.setFields(new ArrayList<>());
-                            schema.getFields().add(new Field(0,
-                                FieldName.COMMON_MESSAGE.getKey(), 
FieldType.STRING));
-
+                            List<Field> fields = new ArrayList<>();
+                            Schema schema = new 
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
+                            schema.getFields().add(new Field(0, 
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
                             for (MessageExt msg : msgs) {
-                                DataEntryBuilder dataEntryBuilder = new 
DataEntryBuilder(schema);
-                                
dataEntryBuilder.timestamp(System.currentTimeMillis())
-                                    
.queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
-                                
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new 
String(msg.getBody()));
-                                SourceDataEntry sourceDataEntry = 
dataEntryBuilder.buildSourceDataEntry(
-                                    ByteBuffer.wrap(RmqConstants.getPartition(
-                                        taskTopicConfig.getTopic(),
-                                        taskTopicConfig.getBrokerName(),
-                                        
String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
-                                    
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
-                                );
-                                
sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
-                                res.add(sourceDataEntry);
+                                JSONObject jsonObject = new JSONObject();
+                                
jsonObject.put(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
+                                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(taskTopicConfig.getTopic(), 
taskTopicConfig.getBrokerName(), String.valueOf(msg.getQueueId())),
+                                    
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(), 
schema, jsonObject.toJSONString());
+                                res.add(connectRecord);
                             }
-
                             break;
                         }
                         default:
@@ -202,21 +195,21 @@ public class RmqSourceTask extends SourceTask {
         return res;
     }
 
-    private Collection<SourceDataEntry> pollTopicConfig() {
+    private List<ConnectRecord> pollTopicConfig() {
         DefaultMQAdminExt srcMQAdminExt;
         return new ArrayList<>();
     }
 
-    private Collection<SourceDataEntry> pollBrokerConfig() {
+    private List<ConnectRecord> pollBrokerConfig() {
         return new ArrayList<>();
     }
 
-    private Collection<SourceDataEntry> pollSubConfig() {
+    private List<ConnectRecord> pollSubConfig() {
         return new ArrayList<>();
     }
 
     public Map<TaskTopicInfo, Long> 
getPositionMapWithCheck(List<TaskTopicInfo> taskList,
-        PositionStorageReader positionStorageReader, long timeout, TimeUnit 
unit) {
+        OffsetStorageReader positionStorageReader, long timeout, TimeUnit 
unit) {
         unit = unit == null ? TimeUnit.MILLISECONDS : unit;
 
         Map<TaskTopicInfo, Long> positionMap = getPositionMap(taskList, 
positionStorageReader);
@@ -243,25 +236,23 @@ public class RmqSourceTask extends SourceTask {
             }
 
             waitTime = msecs - (System.currentTimeMillis() - startTime);
-        } while (!waitPositionReady && waitTime > 0L);
+        }
+        while (!waitPositionReady && waitTime > 0L);
 
         return positionMap;
     }
 
     public Map<TaskTopicInfo, Long> getPositionMap(List<TaskTopicInfo> 
taskList,
-        PositionStorageReader positionStorageReader) {
+        OffsetStorageReader offsetStorageReader) {
         Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
         for (TaskTopicInfo tti : taskList) {
-            ByteBuffer positionInfo = positionStorageReader.getPosition(
-                ByteBuffer.wrap(RmqConstants.getPartition(
-                    tti.getTopic(),
-                    tti.getBrokerName(),
-                    
String.valueOf(tti.getQueueId())).getBytes(StandardCharsets.UTF_8)));
-
-            if (null != positionInfo && positionInfo.array().length > 0) {
-                String positionJson = new String(positionInfo.array(), 
StandardCharsets.UTF_8);
-                JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                positionMap.put(tti, 
jsonObject.getLong(RmqConstants.NEXT_POSITION));
+            RecordOffset positionInfo = 
offsetStorageReader.readOffset(Utils.offsetKey(tti.getTopic(), 
tti.getBrokerName(),
+                String.valueOf(tti.getQueueId())));
+            if (positionInfo != null && null != positionInfo.getOffset()) {
+                Map<String, ?> offset = positionInfo.getOffset();
+                Object lastRecordedOffset = 
offset.get(RmqConstants.NEXT_POSITION);
+                long skipLeft = 
Long.parseLong(String.valueOf(lastRecordedOffset));
+                positionMap.put(tti, skipLeft);
             } else {
                 positionMap.put(tti, 0L);
             }
@@ -269,5 +260,6 @@ public class RmqSourceTask extends SourceTask {
 
         return positionMap;
     }
+
 }
 
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 1be3d54..ac2efc0 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -18,10 +18,14 @@ package org.apache.rocketmq.replicator.common;
 
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.RmqConstants;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
 import org.apache.rocketmq.replicator.config.TaskConfig;
@@ -117,10 +122,12 @@ public class Utils {
         }
     }
 
-    public static List<KeyValue> groupPartitions(List<String> elements, int 
numGroups, RmqConnectorConfig tdc) {
-        if (numGroups <= 0)
+    public static List<KeyValue> groupPartitions(List<String> elements, 
RmqConnectorConfig tdc,
+        int maxTasks) {
+        if (maxTasks <= 0)
             throw new IllegalArgumentException("Number of groups must be 
positive.");
 
+        int numGroups = Math.min(elements.size(), maxTasks);
         List<KeyValue> result = new ArrayList<>(numGroups);
 
         // Each group has either n+1 or n raw partitions
@@ -200,4 +207,18 @@ public class Utils {
 
         return targetMQAdminExt;
     }
+
+    public static RecordPartition offsetKey(String topic, String broker, 
String queueId) {
+        Map<String, String> map = new HashMap<>();
+        map.put(RmqConstants.TOPIC_NAME, topic);
+        map.put(RmqConstants.BROKER_NAME, broker);
+        map.put(RmqConstants.QUEUE_ID, queueId);
+        return new RecordPartition(map);
+    }
+
+    public static RecordOffset offsetValue(Long pos) {
+        Map<String, String> map = new HashMap<>();
+        map.put(RmqConstants.NEXT_POSITION, String.valueOf(pos));
+        return new RecordOffset(map);
+    }
 }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index d3d5205..fea2cfb 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -45,8 +45,6 @@ public class ConfigDefine {
 
     public static final String CONN_SOURCE_RECORD_CONVERTER = 
"source-record-converter";
 
-    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
-
     public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
 
     public static final String REFRESH_INTERVAL = "refresh.interval";
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
index b88bd69..40edee0 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
@@ -25,8 +25,6 @@ import 
org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
 import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
 
 public class RmqConnectorConfig {
-
-    private int taskParallelism;
     private Set<String> whiteList;
     private String srcNamesrvs;
     private String targetNamesrvs;
@@ -48,8 +46,7 @@ public class RmqConnectorConfig {
     public RmqConnectorConfig() {
     }
 
-    public void validate(KeyValue config) {
-        this.taskParallelism = 
config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);
+    public void init(KeyValue config) {
 
         int strategy = config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, 
DivideStrategyEnum.BY_QUEUE.ordinal());
         if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
@@ -98,10 +95,6 @@ public class RmqConnectorConfig {
         }
     }
 
-    public int getTaskParallelism() {
-        return this.taskParallelism;
-    }
-
     public Set<String> getWhiteList() {
         return this.whiteList;
     }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index 097255b..16a74ed 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -28,8 +28,6 @@ public class TaskDivideConfig {
 
     private int dataType;
 
-    private int taskParallelism;
-
     private boolean srcAclEnable = false;
 
     private String srcAccessKey;
@@ -37,13 +35,12 @@ public class TaskDivideConfig {
     private String srcSecretKey;
 
     public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, 
String storeTopic, String srcRecordConverter,
-        int dataType, int taskParallelism, boolean srcAclEnable, String 
srcAccessKey, String srcSecretKey) {
+        int dataType, boolean srcAclEnable, String srcAccessKey, String 
srcSecretKey) {
         this.sourceNamesrvAddr = sourceNamesrvAddr;
         this.srcCluster = srcCluster;
         this.storeTopic = storeTopic;
         this.srcRecordConverter = srcRecordConverter;
         this.dataType = dataType;
-        this.taskParallelism = taskParallelism;
         this.srcAclEnable = srcAclEnable;
         this.srcAccessKey = srcAccessKey;
         this.srcSecretKey = srcSecretKey;
@@ -89,14 +86,6 @@ public class TaskDivideConfig {
         this.dataType = dataType;
     }
 
-    public int getTaskParallelism() {
-        return taskParallelism;
-    }
-
-    public void setTaskParallelism(int taskParallelism) {
-        this.taskParallelism = taskParallelism;
-    }
-
     public boolean isSrcAclEnable() {
         return srcAclEnable;
     }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
new file mode 100644
index 0000000..b137742
--- /dev/null
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/schema/SchemaEnum.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.replicator.schema;
+
+public enum SchemaEnum {
+    MESSAGE,
+    OFFSET,
+}
\ No newline at end of file
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
index 7c2e27c..b5529aa 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
@@ -33,15 +33,15 @@ import 
org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, 
TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<>();
-        int parallelism = tdc.getTaskParallelism();
         Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
         int id = -1;
 
         Collection<ClientNode> cidNodes = new ArrayList<>();
-        for (int i = 0; i < parallelism; i++) {
+        for (int i = 0; i < maxTasks; i++) {
             cidNodes.add(new ClientNode(i, Integer.toString(i)));
             queueTopicList.put(i, new ArrayList<>());
         }
@@ -57,7 +57,7 @@ public class DivideTaskByConsistentHash extends 
TaskDivideStrategy {
             }
         }
 
-        for (int i = 0; i < parallelism; i++) {
+        for (int i = 0; i < maxTasks; i++) {
             KeyValue keyValue = new DefaultKeyValue();
             keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), 
tdc.getStoreTopic());
             keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), 
tdc.getSourceNamesrvAddr());
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index bbfa580..db2a03b 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -31,10 +31,15 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicRouteMap, TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int queueNum = 0;
+        for (String t : topicRouteMap.keySet()) {
+            queueNum += topicRouteMap.get(t).size();
+        }
+        int parallelism = Math.min(queueNum, maxTasks);
         Map<Integer, List<TaskTopicInfo>> queueTopicList = new 
HashMap<Integer, List<TaskTopicInfo>>();
         int id = -1;
         for (String t : topicRouteMap.keySet()) {
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index 0d13a5e..010cb90 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -28,10 +28,12 @@ import java.util.Map;
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicRouteMap, TaskDivideConfig tdc,
+        int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(topicRouteMap.keySet().size(), maxTasks);
         int id = -1;
         Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, 
List<TaskTopicInfo>>();
         for (Map.Entry<String, Set<TaskTopicInfo>> entry : 
topicRouteMap.entrySet()) {
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 89ed060..778dbee 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -25,5 +25,5 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public abstract class TaskDivideStrategy {
 
-    public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicMap, TaskDivideConfig tdc);
+    public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> 
topicMap, TaskDivideConfig tdc, int maxTasks);
 }
diff --git 
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
 
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
new file mode 100644
index 0000000..60dfe0e
--- /dev/null
+++ 
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java
@@ -0,0 +1,217 @@
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByConsistentHash;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultTaskDivideStrategyTest {
+
+    @Test
+    public void testDivideTaskByTopic() {
+        RmqConnectorConfig config = new RmqConnectorConfig();
+        KeyValue kv = new DefaultKeyValue();
+        kv.put(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, "0");
+        config.init(kv);
+        TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1", 
"source_cluster", 0, "T_TEST1");
+        TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST2", 
"source_cluster", 0, "T_TEST2");
+        TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST3", 
"source_cluster", 0, "T_TEST3");
+        TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST4", 
"source_cluster", 0, "T_TEST4");
+        TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST5", 
"source_cluster", 0, "T_TEST5");
+        Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo1);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo2);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue3 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo3);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue4 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo4);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue5 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo5);
+            }
+        };
+        Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String, 
Set<TaskTopicInfo>>() {
+            {
+                put("T_TEST1", msgQueue1);
+                put("T_TEST2", msgQueue2);
+                put("T_TEST3", msgQueue3);
+                put("T_TEST4", msgQueue4);
+                put("T_TEST5", msgQueue5);
+
+            }
+        };
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            "127.0.0.1:9876",
+            "cluster_source",
+            "store_topic",
+            "",
+            DataType.COMMON_MESSAGE.ordinal(),
+            false,
+            "",
+            ""
+        );
+        List<KeyValue> taskConfigs = 
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
+        assertThat(taskConfigs.size()).isEqualTo(4);
+
+        List<KeyValue> taskConfigs1 = 
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 6);
+        assertThat(taskConfigs1.size()).isEqualTo(5);
+    }
+
+    @Test
+    public void testDivideTaskByQueue() {
+        RmqConnectorConfig config = new RmqConnectorConfig();
+        KeyValue kv = new DefaultKeyValue();
+        kv.put(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, "1");
+        config.init(kv);
+        TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1", 
"source_cluster", 0, "T_TEST1");
+        TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST1", 
"source_cluster", 1, "T_TEST1");
+        TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST1", 
"source_cluster", 2, "T_TEST1");
+        TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST2", 
"source_cluster", 0, "T_TEST2");
+        TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST2", 
"source_cluster", 1, "T_TEST2");
+        Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo1);
+                add(taskTopicInfo2);
+                add(taskTopicInfo3);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo4);
+                add(taskTopicInfo5);
+            }
+        };
+        Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String, 
Set<TaskTopicInfo>>() {
+            {
+                put("T_TEST1", msgQueue1);
+                put("T_TEST2", msgQueue2);
+            }
+        };
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            "127.0.0.1:9876",
+            "cluster_source",
+            "store_topic",
+            "",
+            DataType.COMMON_MESSAGE.ordinal(),
+            false,
+            "",
+            ""
+        );
+        List<KeyValue> taskConfigs = 
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
+        assertThat(taskConfigs.size()).isEqualTo(4);
+
+        List<KeyValue> taskConfigs1 = 
config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 6);
+        assertThat(taskConfigs1.size()).isEqualTo(5);
+    }
+
+    @Test
+    public void testDivideTaskByHash() {
+        TaskTopicInfo taskTopicInfo1 = new TaskTopicInfo("T_TEST1", 
"source_cluster", 0, "T_TEST1");
+        TaskTopicInfo taskTopicInfo2 = new TaskTopicInfo("T_TEST2", 
"source_cluster", 0, "T_TEST2");
+        TaskTopicInfo taskTopicInfo3 = new TaskTopicInfo("T_TEST3", 
"source_cluster", 0, "T_TEST3");
+        TaskTopicInfo taskTopicInfo4 = new TaskTopicInfo("T_TEST4", 
"source_cluster", 0, "T_TEST4");
+        TaskTopicInfo taskTopicInfo5 = new TaskTopicInfo("T_TEST5", 
"source_cluster", 0, "T_TEST5");
+        Set<TaskTopicInfo> msgQueue1 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo1);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue2 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo2);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue3 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo3);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue4 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo4);
+            }
+        };
+        Set<TaskTopicInfo> msgQueue5 = new HashSet<TaskTopicInfo>() {
+            {
+                add(taskTopicInfo5);
+            }
+        };
+        Map<String, Set<TaskTopicInfo>> topicRouteMap = new HashMap<String, 
Set<TaskTopicInfo>>() {
+            {
+                put("T_TEST1", msgQueue1);
+                put("T_TEST2", msgQueue2);
+                put("T_TEST3", msgQueue3);
+                put("T_TEST4", msgQueue4);
+                put("T_TEST5", msgQueue5);
+
+            }
+        };
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            "127.0.0.1:9876",
+            "cluster_source",
+            "store_topic",
+            "",
+            DataType.COMMON_MESSAGE.ordinal(),
+            false,
+            "",
+            ""
+        );
+        DivideTaskByConsistentHash hash = new DivideTaskByConsistentHash();
+        List<KeyValue> taskConfigs = hash.divide(topicRouteMap, tdc, 3);
+        assertThat(taskConfigs.size()).isEqualTo(3);
+
+        List<KeyValue> taskConfigs1 = hash.divide(topicRouteMap, tdc, 6);
+        assertThat(taskConfigs1.size()).isEqualTo(6);
+    }
+
+    @Test
+    public void testDivideTaskByGroup() {
+        RmqConnectorConfig config = new RmqConnectorConfig();
+        KeyValue kv = new DefaultKeyValue();
+        config.init(kv);
+        List<String> knownGroups = new ArrayList<String>() {
+            {
+                add("G_TEST1");
+                add("G_TEST2");
+                add("G_TEST3");
+                add("G_TEST4");
+                add("G_TEST5");
+            }
+        };
+        List<KeyValue> taskConfigs = Utils.groupPartitions(new 
ArrayList<>(knownGroups), config, 6);
+        assertThat(taskConfigs.size()).isEqualTo(5);
+
+        List<KeyValue> taskConfigs1 = Utils.groupPartitions(new 
ArrayList<>(knownGroups), config, 4);
+        assertThat(taskConfigs1.size()).isEqualTo(4);
+    }
+}
diff --git 
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
 
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index 795c386..b8fc0a3 100644
--- 
a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++ 
b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -56,7 +56,7 @@ public class RmqSourceReplicatorTest {
         RmqConnectorConfig config = new RmqConnectorConfig();
         KeyValue kv = new DefaultKeyValue();
         kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
-        config.validate(kv);
+        config.init(kv);
 
         Field field = 
RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
         FieldSetter.setField(rmqSourceReplicator, field, config);

Reply via email to