This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new b30952eb [ISSUE #456] replicator support checkpoint
b30952eb is described below

commit b30952eba1aa3e4516f26f34ccab46ed150ed64b
Author: zhoubo <[email protected]>
AuthorDate: Wed Apr 5 10:38:19 2023 +0800

    [ISSUE #456] replicator support checkpoint
---
 .../replicator/ReplicatorCheckpointConnector.java  | 179 +++++++++++++
 .../replicator/ReplicatorCheckpointTask.java       | 297 +++++++++++++++++++++
 .../config/ReplicatorConnectorConfig.java          | 226 +++++++++++++++-
 .../rocketmq/replicator/utils/ReplicatorUtils.java |  81 +++++-
 4 files changed, 764 insertions(+), 19 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
new file mode 100644
index 00000000..c1dbdc65
--- /dev/null
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.InitMQClientException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class ReplicatorCheckpointConnector extends SourceConnector {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+    private KeyValue config;
+
+    private DefaultMQAdminExt targetMqAdminExt;
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> configs = new ArrayList<>();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, 
config.getString(ReplicatorConnectorConfig.SRC_CLOUD));
+        keyValue.put(ReplicatorConnectorConfig.SRC_REGION, 
config.getString(ReplicatorConnectorConfig.SRC_REGION));
+        keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, 
config.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
+        if (null != 
config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
+            keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID, 
config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+        }
+        keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT, 
config.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
+        keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, 
config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
+        keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, 
config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
+        keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, 
config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
+        keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS, 
config.getString(ReplicatorConnectorConfig.SRC_TOPICS));
+        keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, 
config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
+        keyValue.put(ReplicatorConnectorConfig.DEST_REGION, 
config.getString(ReplicatorConnectorConfig.DEST_REGION));
+        keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, 
config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
+        if (null != 
config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
+            keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, 
config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+        }
+        keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, 
config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
+        if (null != config.getString(ReplicatorConnectorConfig.DEST_TOPIC)) {
+            keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, 
config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
+        }
+        keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, 
config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
+        keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY, 
config.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
+        keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, 
config.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
+        keyValue.put(ReplicatorConnectorConfig.SYNC_GIDS, 
config.getString(ReplicatorConnectorConfig.SYNC_GIDS));
+        configs.add(keyValue);
+        return configs;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ReplicatorCheckpointTask.class;
+    }
+
+
+    private Set<String> neededParamKeys = new HashSet<String>() {
+        {
+            add(ReplicatorConnectorConfig.SRC_CLOUD);
+            add(ReplicatorConnectorConfig.SRC_REGION);
+            add(ReplicatorConnectorConfig.SRC_CLUSTER);
+            add(ReplicatorConnectorConfig.SRC_ENDPOINT);
+            add(ReplicatorConnectorConfig.SRC_TOPICS);
+            add(ReplicatorConnectorConfig.DEST_CLOUD);
+            add(ReplicatorConnectorConfig.DEST_REGION);
+            add(ReplicatorConnectorConfig.DEST_CLUSTER);
+            add(ReplicatorConnectorConfig.DEST_ENDPOINT);
+            add(ReplicatorConnectorConfig.SRC_CLOUD);
+            add(ReplicatorConnectorConfig.SYNC_GIDS);
+            add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
+            add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
+        }
+    };
+
+
+    @Override
+    public void validate(KeyValue config) {
+        if (config.getInt(ConnectorConfig.MAX_TASK, 1) > 1) {
+            log.warn("ReplicatorCheckpointConnector no need to set max-task, 
only used 1.");
+        }
+        // checkpoint just need only one task.
+        config.put(ConnectorConfig.MAX_TASK, 1);
+        
ReplicatorUtils.checkNeedParams(ReplicatorCheckpointConnector.class.getName(), 
config, neededParamKeys);
+
+    }
+
+    @Override
+    public void start(KeyValue keyValue) {
+        this.config = keyValue;
+        try {
+            buildAndStartTargetMQAdmin();
+            createCheckpointTopic();
+        } catch (MQClientException e) {
+            throw new InitMQClientException("Replicator checkpoint connector 
init mqAdminClient error.", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        closeAdmin();
+    }
+
+    private void buildAndStartTargetMQAdmin() throws MQClientException {
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        String destAclEnable = 
config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false");
+        if (destAclEnable.equalsIgnoreCase("true")) {
+            String destAccessKey = 
config.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY);
+            String destSecretKey = 
config.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY);
+            if (StringUtils.isNotEmpty(destAccessKey) && 
StringUtils.isNotEmpty(destSecretKey)) {
+                rpcHook = new AclClientRPCHook(new 
SessionCredentials(destAccessKey, destSecretKey));
+            } else {
+                rpcHook = new AclClientRPCHook(new SessionCredentials());
+            }
+        }
+        if (null == targetMqAdminExt) {
+            targetMqAdminExt = new DefaultMQAdminExt(rpcHook);
+            
targetMqAdminExt.setNamesrvAddr(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
+            
targetMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + "-" + 
UUID.randomUUID().toString());
+            
targetMqAdminExt.setInstanceName("ReplicatorCheckpointConnector_InstanceName_" 
+ UUID.randomUUID().toString());
+            targetMqAdminExt.start();
+        }
+    }
+    private synchronized void closeAdmin() {
+        if (targetMqAdminExt != null) {
+            try {
+                targetMqAdminExt.shutdown();
+            } catch (Throwable e) {
+                log.error("close Admin error,", e);
+            }
+        }
+    }
+
+    private void createCheckpointTopic() {
+        //create target checkpoint topic, todo compact topic
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setReadQueueNums(8);
+        topicConfig.setWriteQueueNums(8);
+        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | 
PermName.PERM_WRITE;
+        topicConfig.setPerm(perm);
+        
topicConfig.setTopicName(config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC,
 ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+        ReplicatorUtils.createTopic(targetMqAdminExt, topicConfig);
+    }
+}
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
new file mode 100644
index 00000000..945018cd
--- /dev/null
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.InitMQClientException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.CONSUMER_GROUP_KEY;
+import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.DOWNSTREAM_LASTTIMESTAMP_KEY;
+import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.METADATA_KEY;
+import static org.apache.rocketmq.replicator.utils.ReplicatorUtils.TOPIC_KEY;
+import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.UPSTREAM_LASTTIMESTAMP_KEY;
+
+
+public class ReplicatorCheckpointTask extends SourceTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private long lastCheckPointTimestamp = System.currentTimeMillis();
+    private DefaultMQAdminExt srcMqAdminExt;
+    private DefaultMQAdminExt targetMqAdminExt;
+
+
+    public static final Schema VALUE_SCHEMA_V0 = SchemaBuilder.struct()
+            .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+            .field(TOPIC_KEY, SchemaBuilder.string().build())
+            .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+            .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+            .field(METADATA_KEY, SchemaBuilder.string().build())
+            .build();
+
+
+    public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
+            .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+            .field(TOPIC_KEY, SchemaBuilder.string().build())
+            .build();
+
+
+    private void buildMqAdmin() throws MQClientException {
+        buildAndStartSrcMQAdmin();
+        buildAndStartTargetMQAdmin();
+    }
+
+    private void buildAndStartSrcMQAdmin() throws MQClientException {
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            if (StringUtils.isNotEmpty(connectorConfig.getSrcAccessKey()) && 
StringUtils.isNotEmpty(connectorConfig.getSrcSecretKey())) {
+                String srcAccessKey = connectorConfig.getSrcAccessKey();
+                String srcSecretKey = connectorConfig.getSrcSecretKey();
+                rpcHook = new AclClientRPCHook(new 
SessionCredentials(srcAccessKey, srcSecretKey));
+            } else {
+                rpcHook = new AclClientRPCHook(new SessionCredentials());
+            }
+        }
+        srcMqAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMqAdminExt.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        srcMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        srcMqAdminExt.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+        try {
+            srcMqAdminExt.start();
+        } catch (Exception e) {
+            log.error("ReplicatorCheckpoint init src mqadmin error,", e);
+            throw e;
+        }
+    }
+
+    private void buildAndStartTargetMQAdmin() throws MQClientException {
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isDestAclEnable()) {
+            if (StringUtils.isNotEmpty(connectorConfig.getDestAccessKey()) && 
StringUtils.isNotEmpty(connectorConfig.getDestSecretKey())) {
+                String destAccessKey = connectorConfig.getDestAccessKey();
+                String destSecretKey = connectorConfig.getDestSecretKey();
+                rpcHook = new AclClientRPCHook(new 
SessionCredentials(destAccessKey, destSecretKey));
+            } else {
+                rpcHook = new AclClientRPCHook(new SessionCredentials());
+            }
+        }
+        targetMqAdminExt = new DefaultMQAdminExt(rpcHook);
+        targetMqAdminExt.setNamesrvAddr(connectorConfig.getDestEndpoint());
+        
targetMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + "-" + 
UUID.randomUUID().toString());
+        
targetMqAdminExt.setInstanceName(connectorConfig.generateDestinationString() + 
"-" + UUID.randomUUID().toString());
+        try {
+            targetMqAdminExt.start();
+        } catch (Exception e) {
+            log.error("ReplicatorCheckpoint init target mqadmin error,", e);
+            throw e;
+        }
+    }
+
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        if (lastCheckPointTimestamp + 
connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
+            log.info("sleep " + lastCheckPointTimestamp + ", " + 
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
+            Thread.sleep(connectorConfig.getCheckpointIntervalMs() - 
System.currentTimeMillis() + lastCheckPointTimestamp + 100);
+            return null;
+        }
+        log.info("not sleep " + lastCheckPointTimestamp + ", " + 
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
+        List<ConnectRecord> connectRecords = new LinkedList<>();
+        // pull consumergroup's consumer commit offset
+        String syncGids = connectorConfig.getSyncGids();
+        if (StringUtils.isEmpty(syncGids)) {
+            lastCheckPointTimestamp = System.currentTimeMillis();
+            return null;
+        }
+        Set<String> srcTopics = 
connectorConfig.getSrcTopics(connectorConfig.getSrcTopics());
+        try {
+            String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
+            for (String consumerGroup : syncGidArr) {
+                for (String srcTopic : srcTopics) {
+                    String srcTopicWithInstanceId = 
ReplicatorUtils.buildTopicWithNamespace(srcTopic, 
connectorConfig.getSrcInstanceId());
+                    String srcConsumerGroupWithInstanceId = 
ReplicatorUtils.buildConsumergroupWithNamespace(consumerGroup, 
connectorConfig.getSrcInstanceId());
+                    try {
+                        ConsumeStats srcConsumeStats = 
srcMqAdminExt.examineConsumeStats(srcConsumerGroupWithInstanceId, 
srcTopicWithInstanceId);
+                        long minSrcLasttimestamp = 
getMinSrcLasttimestamp(srcConsumeStats);
+
+                        String targetTopic = connectorConfig.getDestTopic();
+                        String targetTopicWithInstanceId;
+                        if (StringUtils.isEmpty(targetTopic) || 
StringUtils.isBlank(targetTopic)) {
+                            targetTopicWithInstanceId = 
ReplicatorUtils.buildTopicWithNamespace(srcTopic, 
connectorConfig.getDestInstanceId());
+                        } else {
+                            targetTopicWithInstanceId = 
ReplicatorUtils.buildTopicWithNamespace(targetTopic, 
connectorConfig.getDestInstanceId());
+                        }
+                        ConsumeStats targetConsumeStats = 
targetMqAdminExt.examineConsumeStats(consumerGroup, targetTopicWithInstanceId);
+                        long minDestLasttimestamp = 
getMinSrcLasttimestamp(targetConsumeStats);
+
+                        RecordPartition recordPartition = 
ReplicatorUtils.convertToRecordPartition(srcTopic, consumerGroup);
+                        RecordOffset recordOffset = 
ReplicatorUtils.convertToRecordOffset(0L);
+                        ConnectRecord connectRecord = new 
ConnectRecord(recordPartition, recordOffset, System.currentTimeMillis());
+                        Struct keyStruct = 
buildCheckpointKey(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId);
+                        Struct valueStruct = 
buildCheckpointPlayload(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId, 
minSrcLasttimestamp, minDestLasttimestamp);
+                        connectRecord.setKeySchema(KEY_SCHEMA);
+                        connectRecord.setKey(keyStruct);
+                        connectRecord.setSchema(VALUE_SCHEMA_V0);
+                        connectRecord.setData(valueStruct);
+                        connectRecord.addExtension(TOPIC_KEY, 
connectorConfig.getCheckpointTopic());
+                        connectRecords.add(connectRecord);
+                    } catch (Exception e) {
+                        log.error("examineConsumeStats gid : " + consumerGroup 
+ ", topic : " + srcTopic +  " error", e);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("get syncGids committed offset error, syncGids : " + 
syncGids, e);
+        }
+        //
+        lastCheckPointTimestamp = System.currentTimeMillis();
+        return connectRecords;
+    }
+
+    @NotNull
+    private static Struct buildCheckpointPlayload(String 
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId, long 
minSrcLasttimestamp, long minDestLasttimestamp) {
+        Struct struct = new Struct(VALUE_SCHEMA_V0);
+        struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
+        struct.put(TOPIC_KEY, srcTopicWithInstanceId);
+        struct.put(UPSTREAM_LASTTIMESTAMP_KEY, minSrcLasttimestamp);
+        struct.put(DOWNSTREAM_LASTTIMESTAMP_KEY, minDestLasttimestamp);
+        struct.put(METADATA_KEY, String.valueOf(System.currentTimeMillis()));
+        return struct;
+    }
+
+    private static Struct buildCheckpointKey(String srcTopicWithInstanceId, 
String srcConsumerGroupWithInstanceId) {
+        Struct struct = new Struct(KEY_SCHEMA);
+        struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
+        struct.put(TOPIC_KEY, srcTopicWithInstanceId);
+        return struct;
+    }
+
+    private long getMinSrcLasttimestamp(ConsumeStats consumeStats) throws 
RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        long minSrcLasttimestamp = -1;
+        if (null != consumeStats) {
+            Map<MessageQueue, OffsetWrapper> offsetTable = 
consumeStats.getOffsetTable();
+            if (null != offsetTable) {
+                for (Map.Entry<MessageQueue, OffsetWrapper> entry : 
offsetTable.entrySet()) {
+                    OffsetWrapper offsetWrapper = entry.getValue();
+                    if (null == offsetWrapper) {
+                        continue;
+                    }
+                    long lastTimestamp = offsetWrapper.getLastTimestamp();
+                    if (minSrcLasttimestamp == -1) {
+                        minSrcLasttimestamp = lastTimestamp;
+                    } else if (minSrcLasttimestamp > lastTimestamp) {
+                        minSrcLasttimestamp = lastTimestamp;
+                    }
+                }
+            }
+        }
+        return minSrcLasttimestamp;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        log.info("ReplicatorCheckpointTask init " + config);
+        log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
+        fillConnectorConfig(config);
+        // init mqadmin client
+        try {
+            buildMqAdmin();
+        } catch (Exception e) {
+            cleanResource();
+            log.error("buildMqAdmin error,", e);
+            throw new InitMQClientException("Replicator checkpoint task init 
mqAdminClient error.", e);
+        }
+    }
+
+    private void fillConnectorConfig(KeyValue config) {
+        // build connectConfig
+        
connectorConfig.setSrcCloud(config.getString(connectorConfig.SRC_CLOUD));
+        
connectorConfig.setSrcRegion(config.getString(connectorConfig.SRC_REGION));
+        
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
+        
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
+        
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
+        
connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS));
+        
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
+        
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
+        
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
+        
connectorConfig.setDestInstanceId(config.getString(connectorConfig.DEST_INSTANCEID));
+        
connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT));
+        
connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC));
+        
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
 "true")));
+        
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
 "true")));
+        
connectorConfig.setCheckpointIntervalMs(config.getInt(connectorConfig.CHECKPOINT_INTERVAL_MS,
 connectorConfig.getCheckpointIntervalMs()));
+        
connectorConfig.setSyncGids(config.getString(connectorConfig.SYNC_GIDS));
+        
connectorConfig.setCheckpointTopic(config.getString(connectorConfig.CHECKPOINT_TOPIC,
 connectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+        log.info("ReplicatorCheckpointTask connectorConfig : " + 
connectorConfig);
+    }
+
+    private void cleanResource() {
+        try {
+            if (srcMqAdminExt != null) {
+                srcMqAdminExt.shutdown();
+            }
+            if (targetMqAdminExt != null) {
+                targetMqAdminExt.shutdown();
+            }
+        } catch (Exception e) {
+            log.error("clean resource error,", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        cleanResource();
+    }
+}
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2a926bc2..ab789c69 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -13,7 +13,9 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
+
 package org.apache.rocketmq.replicator.config;
 
 import com.google.common.base.Splitter;
@@ -22,26 +24,26 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-
-/**
- * @author osgoo
- * @date 2022/6/16
- */
 public class ReplicatorConnectorConfig {
     private Log log = LogFactory.getLog(ReplicatorConnectorConfig.class);
     // replicator task id
     private String taskId;
     // connector id
     private String connectorId;
+    // replicator task id with index for max-task, format replicatorTaskId-i
+//    private String replicatorTaskIdWithIndex;
     // src & dest
     private String srcCloud;
     private String srcRegion;
     private String srcCluster;
     private String srcInstanceId;
     private String srcTopicTags; // format 
topic-1,tag-a;topic-2,tag-b;topic-3,tag-c
+    private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c
     private String srcEndpoint;
     private boolean srcAclEnable;
     private boolean autoCreateInnerConsumergroup;
@@ -56,6 +58,10 @@ public class ReplicatorConnectorConfig {
     private boolean destAclEnable;
     private String destAccessKey;
     private String destSecretKey;
+    // filter properties
+    private String filterProperties;
+    private Map<String, String> filterPropertiesMap = new HashMap<>();
+    private String filterPropertiesOperator = "and";
     // consume from where
     private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
     // consume from timestamp
@@ -63,14 +69,33 @@ public class ReplicatorConnectorConfig {
     // sourcetask replicate to mq failover strategy
     private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS;
     private boolean enableHeartbeat = true;
-
+    private boolean enableCheckpoint = true;
+    private boolean enableRetrySync = true;
+    // srcRetryGids used for retry sync,
+    private String srcRetryGids;
+    // destRetryTopic used for store messages from %RETRY%srcRetryGids, can 
set normal topic or retry topic
+    private String destRetryTopic;
+    private boolean enableDlqSync = true;
+    // srcDlqGids used for dlq sync
+    private String srcDlqGids;
+    // destDlqTopic used for store messages from DLQ, can set normal topic
+    private String destDlqTopic;
+    // specify needed to sync %RETRY%GID, GID format is instanceId%Gid
+    // if not set, sync all online gids
+    private String syncGids;
     private String dividedNormalQueues;
+    private String dividedRetryQueues;
+    private String dividedDlqQueues;
     // tps limit
     private int syncTps = 1000;
+    private int maxTask = 2;
     //
     private int heartbeatIntervalMs = 1 * 1000;
+    private int checkpointIntervalMs = 10 * 1000;
     private String heartbeatTopic;
+    private String checkpointTopic;
     public final static String DEFAULT_HEARTBEAT_TOPIC = 
"replicator_heartbeat";
+    public final static String DEFAULT_CHECKPOINT_TOPIC = 
"replicator_checkpoint";
     public final static String TOPIC_TAG_SPLITTER = ",";
     public final static String TOPIC_SPLITTER = ";";
     public final static String TASK_NAME_SPLITTER = "_";
@@ -79,6 +104,7 @@ public class ReplicatorConnectorConfig {
     public final static String GID_SPLITTER = ",";
     public final static String ADMIN_GROUP = "replicator_admin_group";
     public final static int DEFAULT_SYNC_TPS = 1000;
+    public final static int DEFAULT_MAX_TASK = 2;
 
     // poll config
     private int eachQueueBufferSize = 1000;
@@ -87,11 +113,14 @@ public class ReplicatorConnectorConfig {
     // converter
     private String sourceConverter;
 
+//    public final static String REPLICATOR_TASK_ID_WITH_INDEX = 
"replicator-subtask-id";
     public final static String SRC_CLOUD = "src.cloud";
     public final static String SRC_REGION = "src.region";
     public final static String SRC_CLUSTER = "src.cluster";
     public final static String SRC_INSTANCEID = "src.instanceid";
     public final static String SRC_TOPICTAGS = "src.topictags";
+
+    public final static String SRC_TOPICS = "src.topics";
     public final static String SRC_ENDPOINT = "src.endpoint";
     public final static String SRC_ACL_ENABLE = "src.acl.enable";
     public final static String SRC_ACCESS_KEY = "src.access.key";
@@ -106,16 +135,31 @@ public class ReplicatorConnectorConfig {
     public final static String AUTO_CREATE_INNER_CONSUMERGROUP = 
"auto.create.inner.consumergroup";
     public final static String DEST_ACCESS_KEY = "dest.access.key";
     public final static String DEST_SECRET_KEY = "dest.secret.key";
+    public final static String FILTER_PROPERTIES = "filter.properties";// 
pro1,pro2 or  pro1=val1,prop2=val2
+    public final static String FILTER_PROPERTIES_OPERATOR = 
"filter.properties.operator";// and or
     public final static String FAILOVER_STRATEGY = "failover.strategy";
     public final static String ENABLE_HEARTBEAT = "enable.heartbeat";
     public final static String ENABLE_CHECKPOINT = "enable.checkpoint";
+    public final static String ENABLE_RETRY_SYNC = "enable.retrysync";
+    public final static String SRC_RETRY_GIDS = "src.retrygids";
+    public final static String DEST_RETRY_TOPIC = "dest.retrytopic";
+    public final static String ENABLE_DLQ_SYNC = "enable.dlqsync";
+    public final static String SRC_DLQ_GIDS = "dest.dlqgids";
+    public final static String DEST_DLQ_TOPIC = "dest.dqltopic";
+    public final static String SYNC_GIDS = "sync.gids";
     public final static String HEARTBEAT_INTERVALS_MS = 
"heartbeat.interval.ms";
+    public final static String CHECKPOINT_INTERVAL_MS = 
"checkpoint.interval.ms";
+    public final static String CHECKPOINT_TOPIC = "checkpoint.topic";
     public final static String HEARTBEAT_TOPIC = "heartbeat.topic";
+    public final static String EACH_QUEUE_BUFFER_SIZE = 
"each.queue.buffer.size";
     public final static String CONSUME_FROM_WHERE = "consumefromwhere";
     public final static String CONSUME_FROM_TIMESTAMP = "consumefromtimestamp";
     public final static String DIVIDE_STRATEGY = "divide.strategy";
     public final static String DIVIDED_NORMAL_QUEUES = "divided.normalqueues";
+    public final static String DIVIDED_RETRY_QUEUES = "divided.retryqueues";
+    public final static String DIVIDED_DLQ_QUEUES = "divided.dlqqueues";
     public final static String SYNC_TPS = "sync.tps";
+    public final static String MAX_TASK = "max.task";
 
     public String getTaskId() {
         return taskId;
@@ -180,6 +224,8 @@ public class ReplicatorConnectorConfig {
             if (topicAndTag.size() == 1) {
                 if (StringUtils.isBlank(srcInstanceId)) {
                     topicTagMap.put(topicAndTag.get(0), "*");
+                } else if (topicTagPair.startsWith("%RETRY%") || 
topicTagPair.startsWith("%DLQ%")) {
+                    topicTagMap.put(topicAndTag.get(0), "*");
                 } else {
                     topicTagMap.put(srcInstanceId + "%" + topicAndTag.get(0), 
"*");
                 }
@@ -194,6 +240,15 @@ public class ReplicatorConnectorConfig {
         return topicTagMap;
     }
 
+
+    public static Set<String> getSrcTopics(String srcTopics) {
+        if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) {
+            return null;
+        }
+        List<String> topicList = 
Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics);
+        return new HashSet(topicList);
+    }
+
     public String getSrcTopicTags() {
         return srcTopicTags;
     }
@@ -202,6 +257,14 @@ public class ReplicatorConnectorConfig {
         this.srcTopicTags = srcTopicTags;
     }
 
+    public String getSrcTopics() {
+        return srcTopics;
+    }
+
+    public void setSrcTopics(String srcTopics) {
+        this.srcTopics = srcTopics;
+    }
+
     public String getSrcEndpoint() {
         return srcEndpoint;
     }
@@ -258,6 +321,39 @@ public class ReplicatorConnectorConfig {
         this.destEndpoint = destEndpoint;
     }
 
+    public String getFilterProperties() {
+        return filterProperties;
+    }
+
+    public Map<String, String> getFilterPropertiesMap() {
+        return this.filterPropertiesMap;
+    }
+
+    public void setFilterProperties(String filterProperties) {
+        this.filterProperties = filterProperties;
+        if (StringUtils.isNotBlank(filterProperties)) {
+            String[] kvs = filterProperties.split(",");
+            for (String kv : kvs) {
+                String[] items = kv.split("=");
+                if (items.length == 1) {
+                    filterPropertiesMap.put(items[0], null);
+                } else if (items.length == 2) {
+                    filterPropertiesMap.put(items[0], items[1]);
+                } else {
+                    log.error("parse filterProperties error");
+                }
+            }
+        }
+    }
+
+    public String getFilterPropertiesOperator() {
+        return filterPropertiesOperator;
+    }
+
+    public void setFilterPropertiesOperator(String filterPropertiesOperator) {
+        this.filterPropertiesOperator = filterPropertiesOperator;
+    }
+
     public FailoverStrategy getFailoverStrategy() {
         return failoverStrategy;
     }
@@ -274,6 +370,69 @@ public class ReplicatorConnectorConfig {
         this.enableHeartbeat = enableHeartbeat;
     }
 
+    public boolean isEnableCheckpoint() {
+        return enableCheckpoint;
+    }
+
+    public void setEnableCheckpoint(boolean enableCheckpoint) {
+        this.enableCheckpoint = enableCheckpoint;
+    }
+
+    public boolean isEnableRetrySync() {
+        return enableRetrySync;
+    }
+
+    public void setEnableRetrySync(boolean enableRetrySync) {
+        this.enableRetrySync = enableRetrySync;
+    }
+
+    public String getSrcRetryGids() {
+        return srcRetryGids;
+    }
+
+    public void setSrcRetryGids(String srcRetryGids) {
+        this.srcRetryGids = srcRetryGids;
+    }
+
+    public String getDestRetryTopic() {
+        return destRetryTopic;
+    }
+
+    public void setDestRetryTopic(String destRetryTopic) {
+        this.destRetryTopic = destRetryTopic;
+    }
+
+    public boolean isEnableDlqSync() {
+        return enableDlqSync;
+    }
+
+    public void setEnableDlqSync(boolean enableDlqSync) {
+        this.enableDlqSync = enableDlqSync;
+    }
+
+    public String getSrcDlqGids() {
+        return srcDlqGids;
+    }
+
+    public void setSrcDlqGids(String srcDlqGids) {
+        this.srcDlqGids = srcDlqGids;
+    }
+
+    public String getDestDlqTopic() {
+        return destDlqTopic;
+    }
+
+    public void setDestDlqTopic(String destDlqTopic) {
+        this.destDlqTopic = destDlqTopic;
+    }
+
+    public String getSyncGids() {
+        return syncGids;
+    }
+
+    public void setSyncGids(String syncGids) {
+        this.syncGids = syncGids;
+    }
 
     public String getDividedNormalQueues() {
         return dividedNormalQueues;
@@ -283,6 +442,22 @@ public class ReplicatorConnectorConfig {
         this.dividedNormalQueues = dividedNormalQueues;
     }
 
+    public String getDividedRetryQueues() {
+        return dividedRetryQueues;
+    }
+
+    public void setDividedRetryQueues(String dividedRetryQueues) {
+        this.dividedRetryQueues = dividedRetryQueues;
+    }
+
+    public String getDividedDlqQueues() {
+        return dividedDlqQueues;
+    }
+
+    public void setDividedDlqQueues(String dividedDlqQueues) {
+        this.dividedDlqQueues = dividedDlqQueues;
+    }
+
     public int getHeartbeatIntervalMs() {
         return heartbeatIntervalMs;
     }
@@ -291,6 +466,14 @@ public class ReplicatorConnectorConfig {
         this.heartbeatIntervalMs = heartbeatIntervalMs;
     }
 
+    public int getCheckpointIntervalMs() {
+        return checkpointIntervalMs;
+    }
+
+    public void setCheckpointIntervalMs(int checkpointIntervalMs) {
+        this.checkpointIntervalMs = checkpointIntervalMs;
+    }
+
     public String getHeartbeatTopic() {
         return heartbeatTopic;
     }
@@ -299,6 +482,14 @@ public class ReplicatorConnectorConfig {
         this.heartbeatTopic = heartbeatTopic;
     }
 
+    public String getCheckpointTopic() {
+        return checkpointTopic;
+    }
+
+    public void setCheckpointTopic(String checkpointTopic) {
+        this.checkpointTopic = checkpointTopic;
+    }
+
     public int getEachQueueBufferSize() {
         return eachQueueBufferSize;
     }
@@ -337,6 +528,14 @@ public class ReplicatorConnectorConfig {
         this.syncTps = syncTps;
     }
 
+    public int getMaxTask() {
+        return maxTask;
+    }
+
+    public void setMaxTask(int maxTask) {
+        this.maxTask = maxTask;
+    }
+
     public int getPullMaxNum() {
         return pullMaxNum;
     }
@@ -345,9 +544,10 @@ public class ReplicatorConnectorConfig {
         this.pullMaxNum = pullMaxNum;
     }
 
+    public static final String SYSTEM_CONSUMER_PREFIX = 
"CID_RMQ_SYS_REPLICATOR_";
     public String generateTaskIdWithIndexAsConsumerGroup() {
         // todo need use replicatorTaskIdWithIndex for consumerGroup ???
-        return "SYS_ROCKETMQ_REPLICATOR_" + connectorId;
+        return SYSTEM_CONSUMER_PREFIX + connectorId;
 //        return "SYS_ROCKETMQ_REPLICATOR_" + connectorId + "_" + taskId;
     }
 
@@ -454,10 +654,22 @@ public class ReplicatorConnectorConfig {
                 ", consumeFromTimestamp=" + consumeFromTimestamp +
                 ", failoverStrategy=" + failoverStrategy +
                 ", enableHeartbeat=" + enableHeartbeat +
+                ", enableCheckpoint=" + enableCheckpoint +
+                ", enableRetrySync=" + enableRetrySync +
+                ", srcRetryGids='" + srcRetryGids + '\'' +
+                ", destRetryTopic='" + destRetryTopic + '\'' +
+                ", enableDlqSync=" + enableDlqSync +
+                ", srcDlqGids='" + srcDlqGids + '\'' +
+                ", destDlqTopic='" + destDlqTopic + '\'' +
+                ", syncGids='" + syncGids + '\'' +
                 ", dividedNormalQueues='" + dividedNormalQueues + '\'' +
+                ", dividedRetryQueues='" + dividedRetryQueues + '\'' +
+                ", dividedDlqQueues='" + dividedDlqQueues + '\'' +
                 ", syncTps=" + syncTps +
                 ", heartbeatIntervalMs=" + heartbeatIntervalMs +
+                ", checkpointIntervalMs=" + checkpointIntervalMs +
                 ", heartbeatTopic='" + heartbeatTopic + '\'' +
+                ", checkpointTopic='" + checkpointTopic + '\'' +
                 ", eachQueueBufferSize=" + eachQueueBufferSize +
                 ", pullMaxNum=" + pullMaxNum +
                 ", sourceConverter='" + sourceConverter + '\'' +
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
index 1c7e516c..fd28a9b7 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
@@ -13,29 +13,54 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
+
 package org.apache.rocketmq.replicator.utils;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.replicator.exception.ParamInvalidException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import static 
org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
 
-/**
- * @author osgoo
- * @date 2022/7/21
- */
 public class ReplicatorUtils {
-    private static Log log = LogFactory.getLog(ReplicatorUtils.class);
-    public static String buildTopicWithNamespace(String rawTopic, String 
instanceId) {
-        if (StringUtils.isBlank(instanceId) || 
StringUtils.isEmpty(instanceId)) {
-            return rawTopic;
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+
+    public static final String TOPIC_KEY = "topic";
+    public static final String CONSUMER_GROUP_KEY = "consumerGroup";
+    public static final String UPSTREAM_LASTTIMESTAMP_KEY = 
"upstreamLastTimestamp";
+    public static final String DOWNSTREAM_LASTTIMESTAMP_KEY = 
"downstreamLastTimestamp";
+    public static final String METADATA_KEY = "metadata";
+
+    public static String buildTopicWithNamespace(String topic, String 
instanceId) {
+        if (StringUtils.isBlank(instanceId)) {
+            return topic;
+        }
+        return instanceId + "%" + topic;
+    }
+
+    public static String buildConsumergroupWithNamespace(String consumerGroup, 
String instanceId) {
+        if (StringUtils.isBlank(instanceId)) {
+            return consumerGroup;
         }
-        return instanceId + "%" + rawTopic;
+        return instanceId + "%" + consumerGroup;
     }
 
     public static void checkNeedParams(String connectorName, KeyValue config, 
Set<String> neededParamKeys) {
@@ -57,4 +82,36 @@ public class ReplicatorUtils {
         configs = Arrays.asList(sortedKv);
         return configs;
     }
+
+    public static RecordPartition convertToRecordPartition(String topic, 
String consumerGroup) {
+        Map<String, String> map = new HashMap<>();
+        map.put(CONSUMER_GROUP_KEY, consumerGroup);
+        map.put(TOPIC_KEY, topic);
+        RecordPartition recordPartition = new RecordPartition(map);
+        return recordPartition;
+    }
+
+    public static RecordOffset convertToRecordOffset(Long offset) {
+        Map<String, String> offsetMap = new HashMap<>();
+        offsetMap.put(QUEUE_OFFSET, offset + "");
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    public static void createTopic(DefaultMQAdminExt defaultMQAdminExt, 
TopicConfig topicConfig) {
+        try {
+            ClusterInfo clusterInfo = 
defaultMQAdminExt.examineBrokerClusterInfo();
+            Map<String, Set<String>> clusterAddrTable = 
clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, 
topicConfig);
+                    log.info("CreateTopic cluster {}, addr {}, masterSet {}, 
create topic {} success. ", clusterName, addr, masterSet, topicConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Create topic [" + 
topicConfig.getTopicName() + "] failed", e);
+        }
+    }
 }


Reply via email to