odbozhou commented on code in PR #324:
URL: https://github.com/apache/rocketmq-connect/pull/324#discussion_r977245298


##########
connectors/rocketmq-replicator/pom.xml:
##########
@@ -138,6 +138,18 @@
             <artifactId>fastjson</artifactId>
             <version>${fastjson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>31.1-jre</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>

Review Comment:
   Connector implementations should not rely on Runtime



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.StartTaskException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.*;
+
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorHeartbeatTask extends SourceTask {
+    private Logger log = 
LoggerFactory.getLogger(ReplicatorHeartbeatTask.class);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private DefaultMQProducer producer;
+    private final String consumeGroup = "ReplicatorHeartbeatTask";
+    private DefaultMQPushConsumer consumer;
+    private volatile long producerLastSendOk = System.currentTimeMillis();
+    private volatile long consumerLastConsumeOk = System.currentTimeMillis();
+    private final long PRODUCER_SEND_ERROR_MAX_LASTING = 15000;
+    private final long CONSUMER_CONSUME_ERROR_MAX_LASTING = 15000;
+    private final long HEALTHCHECK_PERIOD_MS = 1000;
+    private ScheduledExecutorService executorService;
+
+    private void reBuildProducer() throws Exception {
+        if (producer != null) {
+            producer.shutdown();
+        }
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        producer = new DefaultMQProducer(rpcHook);
+        producer.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        producer.setProducerGroup(consumeGroup);
+        producer.setInstanceName(connectorConfig.generateSourceString() + "-" 
+ UUID.randomUUID().toString());
+        producer.start();
+    }
+
+    private void createAndUpdatePullConsumerGroup(String clusterName, String 
subscriptionGroupName) throws MQClientException, InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+        DefaultMQAdminExt srcMQAdminExt;
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isDestAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(connectorConfig.getDestEndpoint());
+        srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        
srcMQAdminExt.setInstanceName(connectorConfig.generateDestinationString() + "-" 
+ UUID.randomUUID().toString());
+
+        log.info("initAdminThread : " + Thread.currentThread().getName());
+        srcMQAdminExt.start();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(subscriptionGroupName);
+        Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(srcMQAdminExt, 
clusterName);
+        for (String addr : masterSet) {
+            try {
+                srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
+                log.info("create subscription group to %s success.%n", addr);
+            } catch (Exception e) {
+                e.printStackTrace();
+                Thread.sleep(1000 * 1);
+            }
+        }
+    }
+
+    private void reBuildConsumer() throws Exception {
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+        RPCHook rpcHook = null;
+        if (connectorConfig.isDestAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        consumer = new DefaultMQPushConsumer(rpcHook);
+        consumer.setNamesrvAddr(connectorConfig.getDestEndpoint());
+        consumer.setInstanceName(connectorConfig.generateDestinationString() + 
"-" + UUID.randomUUID().toString());
+        consumer.setConsumerGroup(consumeGroup);
+        consumer.subscribe(connectorConfig.getHeartbeatTopic(), "*");
+        consumer.setMessageModel(MessageModel.CLUSTERING);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                // check message & calculate rt
+                MessageExt messageExt = list.get(0);
+                long bornTimestamp = messageExt.getBornTimestamp();
+                long storeTimestamp = messageExt.getStoreTimestamp();
+                long consumeTimestamp = System.currentTimeMillis();
+                long rt = consumeTimestamp - bornTimestamp;
+                
ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_HEARTBEAT_DELAY_MS,
 connectorConfig.getConnectorId(), (int)rt, 1);
+                log.info(messageExt.getUserProperty("src") + " -->  " + 
messageExt.getUserProperty("dest") + " RT " + bornTimestamp + "," + 
storeTimestamp + "," + consumeTimestamp);
+                consumerLastConsumeOk = consumeTimestamp;
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+    }
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        try {
+            // healthcheck producer & consumer
+            healthCheck();
+            Thread.sleep(HEALTHCHECK_PERIOD_MS);
+        } catch (Exception e) {
+            log.error("ReplicatorHeartbeatTask healthCheck exception,", e);
+        }
+        return null;
+    }
+
+    private void healthCheck() throws Exception {
+        if (producerLastSendOk + PRODUCER_SEND_ERROR_MAX_LASTING < 
System.currentTimeMillis()) {
+            log.error(" rebuild producer, ReplicatorHeartbeatTask healthCheck 
producer send error > " + PRODUCER_SEND_ERROR_MAX_LASTING);
+            reBuildProducer();
+        }
+        if (consumerLastConsumeOk + CONSUMER_CONSUME_ERROR_MAX_LASTING < 
System.currentTimeMillis()) {
+            log.error(" rebuild consumer, ReplicatorHeartbeatTask healthCheck 
consumer consume error > " + CONSUMER_CONSUME_ERROR_MAX_LASTING);
+            reBuildConsumer();
+        }
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        log.info("ReplicatorHeartbeatTask init " + config);
+        log.info(" sourceTaskContextConfigs : " + sourceTaskContext.configs());
+        // build connectConfig
+        
connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()));
+        connectorConfig.setConnectorId(sourceTaskContext.getConnectorName());
+        
connectorConfig.setSrcCloud(config.getString(connectorConfig.SRC_CLOUD));
+        
connectorConfig.setSrcRegion(config.getString(connectorConfig.SRC_REGION));
+        
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
+        
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
+        
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
+        
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.SRC_TOPICTAGS));
+        
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
+        
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
+        
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
+        
connectorConfig.setDestInstanceId(config.getString(connectorConfig.DEST_INSTANCEID));
+        
connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT));
+        
connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC));
+
+        
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
 "true")));
+        
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
 "true")));
+
+        
connectorConfig.setHeartbeatIntervalMs(config.getInt(connectorConfig.HEARTBEAT_INTERVALS_MS,
 connectorConfig.getHeartbeatIntervalMs()));
+        
connectorConfig.setHeartbeatTopic(config.getString(connectorConfig.HEARTBEAT_TOPIC,
 connectorConfig.DEFAULT_HEARTBEAT_TOPIC));
+
+        log.info("ReplicatorHeartbeatTask connectorConfig : " + 
connectorConfig);
+
+        try {
+            ReplicatorTaskStats.init();
+            // init consumer group
+            String destClusterName = connectorConfig.getDestCluster();
+            createAndUpdatePullConsumerGroup(destClusterName, consumeGroup);
+            // build producer
+            reBuildProducer();
+            // build consumer
+            reBuildConsumer();
+            // start schedule task send msg to src heartbeat topic;
+            executorService = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, 
ReplicatorHeartbeatTask.class.getName() + "-producer");
+                }
+            });
+            executorService.scheduleAtFixedRate(new Runnable() {
+                @Override

Review Comment:
   Whether it can be implemented directly in the poll method of the task 
thread, if a separate thread is opened, it will be completely out of the 
control of the Runtime, such as pause/resume task, the separate thread will not 
be controlled



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.StartTaskException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.*;
+
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorHeartbeatTask extends SourceTask {
+    private Logger log = 
LoggerFactory.getLogger(ReplicatorHeartbeatTask.class);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private DefaultMQProducer producer;
+    private final String consumeGroup = "ReplicatorHeartbeatTask";
+    private DefaultMQPushConsumer consumer;
+    private volatile long producerLastSendOk = System.currentTimeMillis();
+    private volatile long consumerLastConsumeOk = System.currentTimeMillis();
+    private final long PRODUCER_SEND_ERROR_MAX_LASTING = 15000;
+    private final long CONSUMER_CONSUME_ERROR_MAX_LASTING = 15000;
+    private final long HEALTHCHECK_PERIOD_MS = 1000;
+    private ScheduledExecutorService executorService;
+
+    private void reBuildProducer() throws Exception {
+        if (producer != null) {
+            producer.shutdown();
+        }
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        producer = new DefaultMQProducer(rpcHook);
+        producer.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        producer.setProducerGroup(consumeGroup);
+        producer.setInstanceName(connectorConfig.generateSourceString() + "-" 
+ UUID.randomUUID().toString());
+        producer.start();
+    }
+
+    private void createAndUpdatePullConsumerGroup(String clusterName, String 
subscriptionGroupName) throws MQClientException, InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+        DefaultMQAdminExt srcMQAdminExt;
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isDestAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(connectorConfig.getDestEndpoint());
+        srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        
srcMQAdminExt.setInstanceName(connectorConfig.generateDestinationString() + "-" 
+ UUID.randomUUID().toString());
+
+        log.info("initAdminThread : " + Thread.currentThread().getName());
+        srcMQAdminExt.start();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(subscriptionGroupName);
+        Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(srcMQAdminExt, 
clusterName);
+        for (String addr : masterSet) {
+            try {
+                srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
+                log.info("create subscription group to %s success.%n", addr);
+            } catch (Exception e) {
+                e.printStackTrace();
+                Thread.sleep(1000 * 1);
+            }
+        }
+    }
+
+    private void reBuildConsumer() throws Exception {
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+        RPCHook rpcHook = null;
+        if (connectorConfig.isDestAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        consumer = new DefaultMQPushConsumer(rpcHook);
+        consumer.setNamesrvAddr(connectorConfig.getDestEndpoint());
+        consumer.setInstanceName(connectorConfig.generateDestinationString() + 
"-" + UUID.randomUUID().toString());
+        consumer.setConsumerGroup(consumeGroup);
+        consumer.subscribe(connectorConfig.getHeartbeatTopic(), "*");
+        consumer.setMessageModel(MessageModel.CLUSTERING);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                // check message & calculate rt
+                MessageExt messageExt = list.get(0);
+                long bornTimestamp = messageExt.getBornTimestamp();
+                long storeTimestamp = messageExt.getStoreTimestamp();
+                long consumeTimestamp = System.currentTimeMillis();
+                long rt = consumeTimestamp - bornTimestamp;
+                
ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_HEARTBEAT_DELAY_MS,
 connectorConfig.getConnectorId(), (int)rt, 1);
+                log.info(messageExt.getUserProperty("src") + " -->  " + 
messageExt.getUserProperty("dest") + " RT " + bornTimestamp + "," + 
storeTimestamp + "," + consumeTimestamp);
+                consumerLastConsumeOk = consumeTimestamp;
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+    }
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        try {
+            // healthcheck producer & consumer
+            healthCheck();
+            Thread.sleep(HEALTHCHECK_PERIOD_MS);
+        } catch (Exception e) {
+            log.error("ReplicatorHeartbeatTask healthCheck exception,", e);
+        }
+        return null;
+    }
+
+    private void healthCheck() throws Exception {
+        if (producerLastSendOk + PRODUCER_SEND_ERROR_MAX_LASTING < 
System.currentTimeMillis()) {
+            log.error(" rebuild producer, ReplicatorHeartbeatTask healthCheck 
producer send error > " + PRODUCER_SEND_ERROR_MAX_LASTING);
+            reBuildProducer();
+        }
+        if (consumerLastConsumeOk + CONSUMER_CONSUME_ERROR_MAX_LASTING < 
System.currentTimeMillis()) {
+            log.error(" rebuild consumer, ReplicatorHeartbeatTask healthCheck 
consumer consume error > " + CONSUMER_CONSUME_ERROR_MAX_LASTING);
+            reBuildConsumer();
+        }
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        log.info("ReplicatorHeartbeatTask init " + config);
+        log.info(" sourceTaskContextConfigs : " + sourceTaskContext.configs());
+        // build connectConfig
+        
connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()));

Review Comment:
   Get the taskName directly



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java:
##########
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.util.concurrent.RateLimiter;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.replicator.config.ConsumeFromWhere;
+import org.apache.rocketmq.replicator.config.FailoverStrategy;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.StartTaskException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicatorSourceTask.class);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private DefaultMQAdminExt srcMQAdminExt;
+    private ScheduledExecutorService metricsMonitorExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "Replicator_lag_metrics");
+        }
+    });
+    private Map<String, List<String>> metricsItem2KeyMap = new HashMap<>();
+    private final long period = 60 * 1000;
+    private DefaultLitePullConsumer pullConsumer;
+    private AtomicLong noMessageCounter = new AtomicLong();
+    private Random random = new Random();
+    private final int printLogThreshold = 100000;
+    private Converter recordConverter;
+    private RateLimiter rateLimiter;
+//    private QueueOffsetManager manager;
+    private List<MessageQueue> normalQueues = new ArrayList<>();
+
+    private AtomicLong circleReplicateCounter = new AtomicLong();
+    private final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY = 
"REPLICATOR-source-topic";
+    // msg born timestamp on src
+    private final String REPLICATOR_BORN_SOURCE_TIMESTAMP = 
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
+    // msg born from where
+    private final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION = 
"REPLICATOR-BORN-SOURCE";
+    // msg born from which topic
+    private final String REPLICATOR_BORE_INSTANCEID_TOPIC = 
"REPLICATOR-BORN-TOPIC";
+    // src messageid  equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
+    private static final String REPLICATOR_SRC_MESSAGE_ID = "EXTEND_UNIQ_INFO";
+    // src dupinof  equals MessageConst.DUP_INFO
+    private static final String REPLICATOR_DUP_INFO = "DUP_INFO";
+
+    // following sys reserved properties
+    public static final String PROPERTY_TIMER_DELAY_SEC = "TIMER_DELAY_SEC";
+    public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
+    public static final String PROPERTY_TIMER_IN_MS = "TIMER_IN_MS";
+    public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS";
+    public static final String PROPERTY_TIMER_ENQUEUE_MS = "TIMER_ENQUEUE_MS";
+    public static final String PROPERTY_TIMER_DEQUEUE_MS = "TIMER_DEQUEUE_MS";
+    public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES";
+    public static final String PROPERTY_TIMER_DEL_UNIQKEY = 
"TIMER_DEL_UNIQKEY";
+    public static final String PROPERTY_TIMER_DELAY_LEVEL = 
"TIMER_DELAY_LEVEL";
+    public static final String PROPERTY_POP_CK = "POP_CK";
+    public static final String PROPERTY_POP_CK_OFFSET = "POP_CK_OFFSET";
+    public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
+    public static final String PROPERTY_VTOA_TUNNEL_ID = "VTOA_TUNNEL_ID";
+    private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+        {
+            add(MessageConst.PROPERTY_MIN_OFFSET);
+            add(MessageConst.PROPERTY_TRACE_SWITCH);
+            add(MessageConst.PROPERTY_MAX_OFFSET);
+            add(MessageConst.PROPERTY_MSG_REGION);
+            add(MessageConst.PROPERTY_REAL_TOPIC);
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_PRODUCER_GROUP);
+            add(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+            add(REPLICATOR_DUP_INFO);
+            add(REPLICATOR_SRC_MESSAGE_ID);
+            add(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+            add(MessageConst.PROPERTY_TAGS);
+            add(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+            add(MessageConst.PROPERTY_BUYER_ID);
+            add(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+            add(MessageConst.PROPERTY_TRANSFER_FLAG);
+            add(MessageConst.PROPERTY_CORRECTION_FLAG);
+            add(MessageConst.PROPERTY_MQ2_FLAG);
+            add(MessageConst.PROPERTY_RECONSUME_TIME);
+            add(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+            add(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+            add(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
+            add(MessageConst.PROPERTY_INSTANCE_ID);
+            add(PROPERTY_TIMER_DELAY_SEC);
+            add(PROPERTY_TIMER_DELIVER_MS);
+            add(PROPERTY_TIMER_IN_MS);
+            add(PROPERTY_TIMER_OUT_MS);
+            add(PROPERTY_TIMER_ENQUEUE_MS);
+            add(PROPERTY_TIMER_DEQUEUE_MS);
+            add(PROPERTY_TIMER_ROLL_TIMES);
+            add(PROPERTY_TIMER_DEL_UNIQKEY);
+            add(PROPERTY_TIMER_DELAY_LEVEL);
+            add(PROPERTY_POP_CK);
+            add(PROPERTY_POP_CK_OFFSET);
+            add(PROPERTY_FIRST_POP_TIME);
+            add(PROPERTY_VTOA_TUNNEL_ID);
+        }
+    };
+
+    private void buildMqAdminClient() throws MQClientException {
+        if (srcMQAdminExt != null) {
+            srcMQAdminExt.shutdown();
+        }
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        srcMQAdminExt.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+
+        log.info("initAdminThread : " + Thread.currentThread().getName());
+        srcMQAdminExt.start();
+    }
+
+    private void createAndUpdatePullConsumerGroup(String clusterName, String 
subscriptionGroupName) throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(subscriptionGroupName);
+        Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(srcMQAdminExt, 
clusterName);
+        for (String addr : masterSet) {
+            try {
+                srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
+                log.info("create subscription group to {} success.", addr);
+            } catch (Exception e) {
+                log.error(" create subscription error,", e);
+                Thread.sleep(1000 * 1);
+            }
+        }
+    }
+
+    private void buildConverter() throws Exception {
+        final String converterClazzName = connectorConfig.getSourceConverter();
+        if (StringUtils.isNotEmpty(converterClazzName)) {
+            try {
+                Class converterClazz = Class.forName(converterClazzName);
+                recordConverter = (Converter) converterClazz.newInstance();
+                log.info("init recordConverter success.");
+            } catch (Exception e) {
+                log.error("init converter[" + converterClazzName + "] error,", 
e);
+                throw e;
+            }
+        }
+    }
+
+    private synchronized void buildConsumer() {
+        if (pullConsumer != null) {
+            return;
+        }
+        String consumerGroup = 
connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
+        log.info("prepare to use " + consumerGroup + " as consumerGroup start 
consumer.");
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+//        pullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook);
+        pullConsumer = new DefaultLitePullConsumer(consumerGroup, rpcHook);
+        String namesrvAddr = connectorConfig.getSrcEndpoint();
+        pullConsumer.setNamesrvAddr(namesrvAddr);
+        pullConsumer.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+        pullConsumer.setAutoCommit(false);
+    }
+
+    private void subscribeTopicAndStartConsumer() throws MQClientException {
+        ConsumeFromWhere consumeFromWhere = 
connectorConfig.getConsumeFromWhere();
+        
pullConsumer.setConsumeFromWhere(org.apache.rocketmq.common.consumer.ConsumeFromWhere.valueOf(consumeFromWhere.name()));
+        log.info("litePullConsumer use " + consumeFromWhere.name());
+        long consumeFromTimestamp = System.currentTimeMillis();
+        if (consumeFromWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+            consumeFromTimestamp = connectorConfig.getConsumeFromTimestamp();
+            String timestamp = 
UtilAll.timeMillisToHumanString3(consumeFromTimestamp);
+            pullConsumer.setConsumeTimestamp(timestamp);
+            log.info("litePullConsumer consume start at " + timestamp);
+        }
+
+        // init normal queues
+        String normalQueueStrs = connectorConfig.getDividedNormalQueues();
+        List<MessageQueue> allQueues;
+        allQueues = parseMessageQueues(normalQueueStrs);
+        normalQueues.addAll(allQueues);
+        log.info("allQueues : " + allQueues);
+        for (MessageQueue mq : allQueues) {
+            log.info("mq : " + mq.getBrokerName() + mq.getQueueId() + " " + 
mq.hashCode() + mq.getClass());
+        }
+
+        for (MessageQueue mq : allQueues) {
+            String topic = mq.getTopic();
+            String tag = 
connectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
+//            pullConsumer.setSubExpressionForAssign(topic, tag);
+        }
+
+        try {
+            pullConsumer.start();
+            pullConsumer.assign(allQueues);
+        } catch (MQClientException e) {
+            log.error("litePullConsumer start error", e);
+            throw e;
+        }
+    }
+
+    private List<MessageQueue> parseMessageQueues(String queueStrs) {
+        log.info("prepare to parse queueStr 2 obj : " + queueStrs);
+        List<MessageQueue> allQueues = new ArrayList<>();
+        List<MessageQueue> array = JSON.parseArray(queueStrs, 
MessageQueue.class);
+        for (int i = 0;i < array.size();i++) {
+            MessageQueue mq = array.get(i);
+            allQueues.add(mq);
+        }
+        return allQueues;
+    }
+
+    private void execScheduleTask() {
+        metricsMonitorExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                replicateLagMetric();
+                commitOffsetSchedule();
+            }
+        }, period, period, TimeUnit.MILLISECONDS);
+    }
+
+    private void commitOffsetSchedule() {
+        try {
+            pullConsumer.commitSync();
+        } catch (Exception e) {
+            log.error("commit error ,", e);
+        } finally {
+            log.debug("commit finish");
+        }
+    }
+
+    private void replicateLagMetric() {
+        String consumerGroup = 
connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
+        try {
+            ConsumeStats consumeStats = 
srcMQAdminExt.examineConsumeStats(consumerGroup);
+            AtomicLong normalDelayCount = new AtomicLong();
+            AtomicLong normalDelayMs = new AtomicLong();
+            Map<MessageQueue, OffsetWrapper> offsets = 
consumeStats.getOffsetTable();
+            offsets.forEach(new BiConsumer<MessageQueue, OffsetWrapper>() {
+                @Override
+                public void accept(MessageQueue messageQueue, OffsetWrapper 
offsetWrapper) {
+                    long delayMs = System.currentTimeMillis() - 
offsetWrapper.getLastTimestamp();
+                    long delayCount = offsetWrapper.getBrokerOffset() - 
offsetWrapper.getConsumerOffset();
+                    if (normalQueues.contains(messageQueue)) {
+                        normalDelayCount.addAndGet(delayCount);
+                        normalDelayMs.set(delayMs);
+                    } else {
+                        // unknown queues, just ignore;
+                    }
+                }
+            });
+            List<String> delayNumsKeys = new ArrayList<>();
+            List<String> delayMsKeys = new ArrayList<>();
+            String normalNumKey = connectorConfig.getConnectorId();
+            delayNumsKeys.add(normalNumKey);
+            
ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS,
 normalNumKey, (int)normalDelayCount.get(), 1);
+            String normalMsKey = connectorConfig.getConnectorId();
+            delayMsKeys.add(normalMsKey);
+            
ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS,
 normalMsKey, (int)normalDelayMs.get(), 1);
+
+            
metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS, 
delayNumsKeys);
+            
metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS, 
delayMsKeys);
+        } catch (RemotingException | MQClientException e) {
+            log.error(" occur remoting or mqclient exception, retry build 
mqadminclient,", e);
+            try {
+                buildMqAdminClient();
+            } catch (MQClientException mqClientException) {
+                log.error(" rebuild mqadminclient error,", e);
+            }
+        } catch (Exception e) {
+            log.error(" occur unknow exception,", e);
+        }
+    }
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        // use rocketmq-client-5.0.0-replicator-SNAPSHOT tmp version: set 
LITE_PULL_MESSAGE = 11 instead of 361
+        try {
+            List<MessageExt> messageExts = pullConsumer.poll();
+//            PullResult pullResult = pullConsumer.pull(mq, tag, 
pullRequest.getNextOffset(), maxNum);
+            if (null != messageExts && messageExts.size() > 0) {
+                List<ConnectRecord> connectRecords = new 
ArrayList<>(messageExts.size());
+                for (MessageExt msg : messageExts) {
+                    ConnectRecord connectRecord = convertToSinkDataEntry(msg);
+                    if (connectRecord != null) {
+                        connectRecords.add(connectRecord);
+                    }
+                }
+                // blocking until acquire
+                rateLimiter.tryAcquire(1);
+                return connectRecords;
+            } else {
+                if ((noMessageCounter.incrementAndGet() + random.nextInt(10)) 
% printLogThreshold == 0) {
+                    log.info("no new message");
+                }
+            }
+        } catch (Exception e) {
+            log.error("pull message error,", e);
+        }
+        return null;
+    }
+
+    private String swapTopic(String topic) {
+        // normal topic, dest topic use destTopic config
+        if (!topic.startsWith("%RETRY%") && !topic.startsWith("%DLQ%")) {
+            return 
ReplicatorUtils.buildTopicWithNamespace(connectorConfig.getDestTopic(), 
connectorConfig.getDestInstanceId());
+        }
+        log.error("topic : " + topic + " is retry or dlq.");
+        return null;
+    }
+
+    private ConnectRecord convertToSinkDataEntry(MessageExt message) {
+        String topic = message.getTopic();
+        Map<String, String> properties = message.getProperties();
+        log.debug("srcProperties : " + properties);
+        Schema schema;
+        Long timestamp;
+        ConnectRecord sinkDataEntry = null;
+        if (null == recordConverter) {

Review Comment:
   Connector should not need recordConverter. RecordConverter is mainly used 
for serialization and deserialization of data in ConnecRecord, as well as 
uniform packaging of value and schema. This logic is transparent to Connector, 
and Connector only needs to pay attention to the ConnecRecord object.



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.replicator.config.*;
+import org.apache.rocketmq.replicator.exception.GetMetaDataException;
+import org.apache.rocketmq.replicator.exception.InitMQClientException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import java.util.*;
+
+import static 
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
+import static 
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
+import static 
org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorSourceConnector extends SourceConnector {
+    private Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
+    private KeyValue connectorConfig;
+    private DefaultMQAdminExt srcMQAdminExt;
+
+    private synchronized void initAdmin() throws MQClientException {
+        if (srcMQAdminExt == null) {
+            RPCHook rpcHook = null;
+            String srcAclEnable = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false");
+            if (srcAclEnable.equalsIgnoreCase("true")) {
+                String srcAccessKey = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY);
+                String srcSecretKey = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY);
+                rpcHook = new AclClientRPCHook(new 
SessionCredentials(srcAccessKey, srcSecretKey));
+            }
+            srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+            
srcMQAdminExt.setNamesrvAddr(connectorConfig.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
+            
srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + "-" + 
UUID.randomUUID().toString());
+            
srcMQAdminExt.setInstanceName("ReplicatorSourceConnector_InstanceName_" + 
UUID.randomUUID().toString());
+
+            log.info("initAdminThread : " + Thread.currentThread().getName());
+            srcMQAdminExt.start();
+        }
+        log.info("SOURCE: RocketMQ srcMQAdminExt started");
+    }
+
+    private synchronized void closeAdmin() {
+        if (srcMQAdminExt != null) {
+            srcMQAdminExt.shutdown();
+        }
+    }
+
+    private List<MessageQueue> fetchMessageQueues(List<String> topicList) {
+        List<MessageQueue> messageQueues = new LinkedList<>();
+        try {
+            for (String topic : topicList) {
+                TopicRouteData topicRouteData = 
srcMQAdminExt.examineTopicRouteInfo(topic);
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                        MessageQueue messageQueue = new MessageQueue(topic, 
qd.getBrokerName(), i);
+                        messageQueues.add(messageQueue);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetch src topic route error,", e);
+            throw new GetMetaDataException("Replicator source connector fetch 
topic[" + topicList + "] error.", e);
+        }
+        return messageQueues;
+    }
+
+    private List<List<MessageQueue>> divide(List<MessageQueue> taskTopicInfos, 
int maxTasks) {
+        taskTopicInfos = ReplicatorUtils.sortList(taskTopicInfos, new 
Comparator<MessageQueue>() {
+            @Override
+            public int compare(MessageQueue o1, MessageQueue o2) {
+                return o1.compareTo(o2);
+            }
+        });
+        List<List<MessageQueue>> result = new ArrayList<>(maxTasks);
+        for (int i = 0;i < maxTasks;i++) {
+            List<MessageQueue> subTasks = new ArrayList<>();
+            result.add(subTasks);
+            log.info("add subTask");
+        }
+        for (int i = 0;i < taskTopicInfos.size();i++) {
+            int hash = i % maxTasks;
+            MessageQueue messageQueue = taskTopicInfos.get(i);
+            result.get(hash).add(messageQueue);
+            log.info("subtask add queue" + messageQueue);
+        }
+        return result;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        try {
+            initAdmin();
+        } catch (Exception e) {
+            log.error("init admin client error", e);
+            throw new InitMQClientException("Replicator source connecto init 
mqAdminClient error.", e);
+        }
+        // normal topic
+        String topicTags = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS);
+        String srcInstanceId = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID);
+        Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(srcInstanceId, topicTags);
+        Set<String> topics = topicTagMap.keySet();
+        if (CollectionUtils.isEmpty(topics)) {
+            throw new ConnectException("sink connector topics config can be 
null, please check sink connector config info");

Review Comment:
   sink connector?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java:
##########
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.util.concurrent.RateLimiter;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.replicator.config.ConsumeFromWhere;
+import org.apache.rocketmq.replicator.config.FailoverStrategy;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.StartTaskException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicatorSourceTask.class);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private DefaultMQAdminExt srcMQAdminExt;
+    private ScheduledExecutorService metricsMonitorExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "Replicator_lag_metrics");
+        }
+    });
+    private Map<String, List<String>> metricsItem2KeyMap = new HashMap<>();
+    private final long period = 60 * 1000;
+    private DefaultLitePullConsumer pullConsumer;
+    private AtomicLong noMessageCounter = new AtomicLong();
+    private Random random = new Random();
+    private final int printLogThreshold = 100000;
+    private Converter recordConverter;
+    private RateLimiter rateLimiter;
+//    private QueueOffsetManager manager;
+    private List<MessageQueue> normalQueues = new ArrayList<>();
+
+    private AtomicLong circleReplicateCounter = new AtomicLong();
+    private final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY = 
"REPLICATOR-source-topic";
+    // msg born timestamp on src
+    private final String REPLICATOR_BORN_SOURCE_TIMESTAMP = 
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
+    // msg born from where
+    private final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION = 
"REPLICATOR-BORN-SOURCE";
+    // msg born from which topic
+    private final String REPLICATOR_BORE_INSTANCEID_TOPIC = 
"REPLICATOR-BORN-TOPIC";
+    // src messageid  equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
+    private static final String REPLICATOR_SRC_MESSAGE_ID = "EXTEND_UNIQ_INFO";
+    // src dupinof  equals MessageConst.DUP_INFO
+    private static final String REPLICATOR_DUP_INFO = "DUP_INFO";
+
+    // following sys reserved properties
+    public static final String PROPERTY_TIMER_DELAY_SEC = "TIMER_DELAY_SEC";
+    public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
+    public static final String PROPERTY_TIMER_IN_MS = "TIMER_IN_MS";
+    public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS";
+    public static final String PROPERTY_TIMER_ENQUEUE_MS = "TIMER_ENQUEUE_MS";
+    public static final String PROPERTY_TIMER_DEQUEUE_MS = "TIMER_DEQUEUE_MS";
+    public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES";
+    public static final String PROPERTY_TIMER_DEL_UNIQKEY = 
"TIMER_DEL_UNIQKEY";
+    public static final String PROPERTY_TIMER_DELAY_LEVEL = 
"TIMER_DELAY_LEVEL";
+    public static final String PROPERTY_POP_CK = "POP_CK";
+    public static final String PROPERTY_POP_CK_OFFSET = "POP_CK_OFFSET";
+    public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
+    public static final String PROPERTY_VTOA_TUNNEL_ID = "VTOA_TUNNEL_ID";
+    private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+        {
+            add(MessageConst.PROPERTY_MIN_OFFSET);
+            add(MessageConst.PROPERTY_TRACE_SWITCH);
+            add(MessageConst.PROPERTY_MAX_OFFSET);
+            add(MessageConst.PROPERTY_MSG_REGION);
+            add(MessageConst.PROPERTY_REAL_TOPIC);
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_PRODUCER_GROUP);
+            add(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+            add(REPLICATOR_DUP_INFO);
+            add(REPLICATOR_SRC_MESSAGE_ID);
+            add(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+            add(MessageConst.PROPERTY_TAGS);
+            add(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+            add(MessageConst.PROPERTY_BUYER_ID);
+            add(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+            add(MessageConst.PROPERTY_TRANSFER_FLAG);
+            add(MessageConst.PROPERTY_CORRECTION_FLAG);
+            add(MessageConst.PROPERTY_MQ2_FLAG);
+            add(MessageConst.PROPERTY_RECONSUME_TIME);
+            add(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+            add(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+            add(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
+            add(MessageConst.PROPERTY_INSTANCE_ID);
+            add(PROPERTY_TIMER_DELAY_SEC);
+            add(PROPERTY_TIMER_DELIVER_MS);
+            add(PROPERTY_TIMER_IN_MS);
+            add(PROPERTY_TIMER_OUT_MS);
+            add(PROPERTY_TIMER_ENQUEUE_MS);
+            add(PROPERTY_TIMER_DEQUEUE_MS);
+            add(PROPERTY_TIMER_ROLL_TIMES);
+            add(PROPERTY_TIMER_DEL_UNIQKEY);
+            add(PROPERTY_TIMER_DELAY_LEVEL);
+            add(PROPERTY_POP_CK);
+            add(PROPERTY_POP_CK_OFFSET);
+            add(PROPERTY_FIRST_POP_TIME);
+            add(PROPERTY_VTOA_TUNNEL_ID);
+        }
+    };
+
+    private void buildMqAdminClient() throws MQClientException {
+        if (srcMQAdminExt != null) {
+            srcMQAdminExt.shutdown();
+        }
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        srcMQAdminExt.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+
+        log.info("initAdminThread : " + Thread.currentThread().getName());
+        srcMQAdminExt.start();
+    }
+
+    private void createAndUpdatePullConsumerGroup(String clusterName, String 
subscriptionGroupName) throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(subscriptionGroupName);
+        Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(srcMQAdminExt, 
clusterName);
+        for (String addr : masterSet) {
+            try {
+                srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
+                log.info("create subscription group to {} success.", addr);
+            } catch (Exception e) {
+                log.error(" create subscription error,", e);
+                Thread.sleep(1000 * 1);
+            }
+        }
+    }
+
+    private void buildConverter() throws Exception {
+        final String converterClazzName = connectorConfig.getSourceConverter();
+        if (StringUtils.isNotEmpty(converterClazzName)) {
+            try {
+                Class converterClazz = Class.forName(converterClazzName);
+                recordConverter = (Converter) converterClazz.newInstance();
+                log.info("init recordConverter success.");
+            } catch (Exception e) {
+                log.error("init converter[" + converterClazzName + "] error,", 
e);
+                throw e;
+            }
+        }
+    }
+
+    private synchronized void buildConsumer() {
+        if (pullConsumer != null) {
+            return;
+        }
+        String consumerGroup = 
connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
+        log.info("prepare to use " + consumerGroup + " as consumerGroup start 
consumer.");
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+//        pullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook);
+        pullConsumer = new DefaultLitePullConsumer(consumerGroup, rpcHook);
+        String namesrvAddr = connectorConfig.getSrcEndpoint();
+        pullConsumer.setNamesrvAddr(namesrvAddr);
+        pullConsumer.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+        pullConsumer.setAutoCommit(false);
+    }
+
+    private void subscribeTopicAndStartConsumer() throws MQClientException {
+        ConsumeFromWhere consumeFromWhere = 
connectorConfig.getConsumeFromWhere();
+        
pullConsumer.setConsumeFromWhere(org.apache.rocketmq.common.consumer.ConsumeFromWhere.valueOf(consumeFromWhere.name()));
+        log.info("litePullConsumer use " + consumeFromWhere.name());
+        long consumeFromTimestamp = System.currentTimeMillis();
+        if (consumeFromWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+            consumeFromTimestamp = connectorConfig.getConsumeFromTimestamp();
+            String timestamp = 
UtilAll.timeMillisToHumanString3(consumeFromTimestamp);
+            pullConsumer.setConsumeTimestamp(timestamp);
+            log.info("litePullConsumer consume start at " + timestamp);
+        }
+
+        // init normal queues
+        String normalQueueStrs = connectorConfig.getDividedNormalQueues();
+        List<MessageQueue> allQueues;
+        allQueues = parseMessageQueues(normalQueueStrs);
+        normalQueues.addAll(allQueues);
+        log.info("allQueues : " + allQueues);
+        for (MessageQueue mq : allQueues) {
+            log.info("mq : " + mq.getBrokerName() + mq.getQueueId() + " " + 
mq.hashCode() + mq.getClass());
+        }
+
+        for (MessageQueue mq : allQueues) {
+            String topic = mq.getTopic();
+            String tag = 
connectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
+//            pullConsumer.setSubExpressionForAssign(topic, tag);
+        }
+
+        try {
+            pullConsumer.start();
+            pullConsumer.assign(allQueues);
+        } catch (MQClientException e) {
+            log.error("litePullConsumer start error", e);
+            throw e;
+        }
+    }
+
+    private List<MessageQueue> parseMessageQueues(String queueStrs) {
+        log.info("prepare to parse queueStr 2 obj : " + queueStrs);
+        List<MessageQueue> allQueues = new ArrayList<>();
+        List<MessageQueue> array = JSON.parseArray(queueStrs, 
MessageQueue.class);
+        for (int i = 0;i < array.size();i++) {
+            MessageQueue mq = array.get(i);
+            allQueues.add(mq);
+        }
+        return allQueues;
+    }
+
+    private void execScheduleTask() {
+        metricsMonitorExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                replicateLagMetric();
+                commitOffsetSchedule();
+            }
+        }, period, period, TimeUnit.MILLISECONDS);
+    }
+
+    private void commitOffsetSchedule() {
+        try {
+            pullConsumer.commitSync();

Review Comment:
   If it is submitted regularly, data may be lost. For example, if the 
WorkerSourceTask fails to send, and the Connector timed task submits the site



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java:
##########
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.util.concurrent.RateLimiter;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.replicator.config.ConsumeFromWhere;
+import org.apache.rocketmq.replicator.config.FailoverStrategy;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.StartTaskException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+
+
+/**
+ * @author osgoo
+ * @date 2022/6/16
+ */
+public class ReplicatorSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicatorSourceTask.class);
+    private ReplicatorConnectorConfig connectorConfig = new 
ReplicatorConnectorConfig();
+    private DefaultMQAdminExt srcMQAdminExt;
+    private ScheduledExecutorService metricsMonitorExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "Replicator_lag_metrics");
+        }
+    });
+    private Map<String, List<String>> metricsItem2KeyMap = new HashMap<>();
+    private final long period = 60 * 1000;
+    private DefaultLitePullConsumer pullConsumer;
+    private AtomicLong noMessageCounter = new AtomicLong();
+    private Random random = new Random();
+    private final int printLogThreshold = 100000;
+    private Converter recordConverter;
+    private RateLimiter rateLimiter;
+//    private QueueOffsetManager manager;
+    private List<MessageQueue> normalQueues = new ArrayList<>();
+
+    private AtomicLong circleReplicateCounter = new AtomicLong();
+    private final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY = 
"REPLICATOR-source-topic";
+    // msg born timestamp on src
+    private final String REPLICATOR_BORN_SOURCE_TIMESTAMP = 
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
+    // msg born from where
+    private final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION = 
"REPLICATOR-BORN-SOURCE";
+    // msg born from which topic
+    private final String REPLICATOR_BORE_INSTANCEID_TOPIC = 
"REPLICATOR-BORN-TOPIC";
+    // src messageid  equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
+    private static final String REPLICATOR_SRC_MESSAGE_ID = "EXTEND_UNIQ_INFO";
+    // src dupinof  equals MessageConst.DUP_INFO
+    private static final String REPLICATOR_DUP_INFO = "DUP_INFO";
+
+    // following sys reserved properties
+    public static final String PROPERTY_TIMER_DELAY_SEC = "TIMER_DELAY_SEC";
+    public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
+    public static final String PROPERTY_TIMER_IN_MS = "TIMER_IN_MS";
+    public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS";
+    public static final String PROPERTY_TIMER_ENQUEUE_MS = "TIMER_ENQUEUE_MS";
+    public static final String PROPERTY_TIMER_DEQUEUE_MS = "TIMER_DEQUEUE_MS";
+    public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES";
+    public static final String PROPERTY_TIMER_DEL_UNIQKEY = 
"TIMER_DEL_UNIQKEY";
+    public static final String PROPERTY_TIMER_DELAY_LEVEL = 
"TIMER_DELAY_LEVEL";
+    public static final String PROPERTY_POP_CK = "POP_CK";
+    public static final String PROPERTY_POP_CK_OFFSET = "POP_CK_OFFSET";
+    public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
+    public static final String PROPERTY_VTOA_TUNNEL_ID = "VTOA_TUNNEL_ID";
+    private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+        {
+            add(MessageConst.PROPERTY_MIN_OFFSET);
+            add(MessageConst.PROPERTY_TRACE_SWITCH);
+            add(MessageConst.PROPERTY_MAX_OFFSET);
+            add(MessageConst.PROPERTY_MSG_REGION);
+            add(MessageConst.PROPERTY_REAL_TOPIC);
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_PRODUCER_GROUP);
+            add(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+            add(REPLICATOR_DUP_INFO);
+            add(REPLICATOR_SRC_MESSAGE_ID);
+            add(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+            add(MessageConst.PROPERTY_TAGS);
+            add(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //
+            add(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+            add(MessageConst.PROPERTY_BUYER_ID);
+            add(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+            add(MessageConst.PROPERTY_TRANSFER_FLAG);
+            add(MessageConst.PROPERTY_CORRECTION_FLAG);
+            add(MessageConst.PROPERTY_MQ2_FLAG);
+            add(MessageConst.PROPERTY_RECONSUME_TIME);
+            add(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+            add(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+            add(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
+            add(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
+            add(MessageConst.PROPERTY_INSTANCE_ID);
+            add(PROPERTY_TIMER_DELAY_SEC);
+            add(PROPERTY_TIMER_DELIVER_MS);
+            add(PROPERTY_TIMER_IN_MS);
+            add(PROPERTY_TIMER_OUT_MS);
+            add(PROPERTY_TIMER_ENQUEUE_MS);
+            add(PROPERTY_TIMER_DEQUEUE_MS);
+            add(PROPERTY_TIMER_ROLL_TIMES);
+            add(PROPERTY_TIMER_DEL_UNIQKEY);
+            add(PROPERTY_TIMER_DELAY_LEVEL);
+            add(PROPERTY_POP_CK);
+            add(PROPERTY_POP_CK_OFFSET);
+            add(PROPERTY_FIRST_POP_TIME);
+            add(PROPERTY_VTOA_TUNNEL_ID);
+        }
+    };
+
+    private void buildMqAdminClient() throws MQClientException {
+        if (srcMQAdminExt != null) {
+            srcMQAdminExt.shutdown();
+        }
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+        srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+        srcMQAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + 
"-" + UUID.randomUUID().toString());
+        srcMQAdminExt.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+
+        log.info("initAdminThread : " + Thread.currentThread().getName());
+        srcMQAdminExt.start();
+    }
+
+    private void createAndUpdatePullConsumerGroup(String clusterName, String 
subscriptionGroupName) throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(subscriptionGroupName);
+        Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(srcMQAdminExt, 
clusterName);
+        for (String addr : masterSet) {
+            try {
+                srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
+                log.info("create subscription group to {} success.", addr);
+            } catch (Exception e) {
+                log.error(" create subscription error,", e);
+                Thread.sleep(1000 * 1);
+            }
+        }
+    }
+
+    private void buildConverter() throws Exception {
+        final String converterClazzName = connectorConfig.getSourceConverter();
+        if (StringUtils.isNotEmpty(converterClazzName)) {
+            try {
+                Class converterClazz = Class.forName(converterClazzName);
+                recordConverter = (Converter) converterClazz.newInstance();
+                log.info("init recordConverter success.");
+            } catch (Exception e) {
+                log.error("init converter[" + converterClazzName + "] error,", 
e);
+                throw e;
+            }
+        }
+    }
+
+    private synchronized void buildConsumer() {
+        if (pullConsumer != null) {
+            return;
+        }
+        String consumerGroup = 
connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
+        log.info("prepare to use " + consumerGroup + " as consumerGroup start 
consumer.");
+        // use /home/admin/onskey white ak as default
+        RPCHook rpcHook = null;
+        if (connectorConfig.isSrcAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials());
+        }
+//        pullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook);
+        pullConsumer = new DefaultLitePullConsumer(consumerGroup, rpcHook);
+        String namesrvAddr = connectorConfig.getSrcEndpoint();
+        pullConsumer.setNamesrvAddr(namesrvAddr);
+        pullConsumer.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
+        pullConsumer.setAutoCommit(false);
+    }
+
+    private void subscribeTopicAndStartConsumer() throws MQClientException {
+        ConsumeFromWhere consumeFromWhere = 
connectorConfig.getConsumeFromWhere();
+        
pullConsumer.setConsumeFromWhere(org.apache.rocketmq.common.consumer.ConsumeFromWhere.valueOf(consumeFromWhere.name()));
+        log.info("litePullConsumer use " + consumeFromWhere.name());
+        long consumeFromTimestamp = System.currentTimeMillis();
+        if (consumeFromWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+            consumeFromTimestamp = connectorConfig.getConsumeFromTimestamp();
+            String timestamp = 
UtilAll.timeMillisToHumanString3(consumeFromTimestamp);
+            pullConsumer.setConsumeTimestamp(timestamp);
+            log.info("litePullConsumer consume start at " + timestamp);
+        }
+
+        // init normal queues
+        String normalQueueStrs = connectorConfig.getDividedNormalQueues();
+        List<MessageQueue> allQueues;
+        allQueues = parseMessageQueues(normalQueueStrs);
+        normalQueues.addAll(allQueues);
+        log.info("allQueues : " + allQueues);
+        for (MessageQueue mq : allQueues) {
+            log.info("mq : " + mq.getBrokerName() + mq.getQueueId() + " " + 
mq.hashCode() + mq.getClass());
+        }
+
+        for (MessageQueue mq : allQueues) {
+            String topic = mq.getTopic();
+            String tag = 
connectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
+//            pullConsumer.setSubExpressionForAssign(topic, tag);
+        }
+
+        try {
+            pullConsumer.start();
+            pullConsumer.assign(allQueues);
+        } catch (MQClientException e) {
+            log.error("litePullConsumer start error", e);
+            throw e;
+        }
+    }
+
+    private List<MessageQueue> parseMessageQueues(String queueStrs) {
+        log.info("prepare to parse queueStr 2 obj : " + queueStrs);
+        List<MessageQueue> allQueues = new ArrayList<>();
+        List<MessageQueue> array = JSON.parseArray(queueStrs, 
MessageQueue.class);
+        for (int i = 0;i < array.size();i++) {
+            MessageQueue mq = array.get(i);
+            allQueues.add(mq);
+        }
+        return allQueues;
+    }
+
+    private void execScheduleTask() {
+        metricsMonitorExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                replicateLagMetric();
+                commitOffsetSchedule();
+            }
+        }, period, period, TimeUnit.MILLISECONDS);
+    }
+
+    private void commitOffsetSchedule() {
+        try {
+            pullConsumer.commitSync();

Review Comment:
   io.openmessaging.connector.api.component.task.source.SourceTask#commit()
   Should the commit method be implemented? This method will be called back if 
the SourceTask is successfully processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to