This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 5340b269 [ISSUE #458] Add request reconfig for replicator (#459)
5340b269 is described below

commit 5340b269cc89ceb9b0b6f54aded3f68bfdf90820
Author: Jack Tsai <[email protected]>
AuthorDate: Thu Apr 13 16:51:55 2023 +0800

    [ISSUE #458] Add request reconfig for replicator (#459)
    
    Co-authored-by: tsaitsung-han.tht <[email protected]>
---
 .../replicator/ReplicatorSourceConnector.java      | 63 +++++++++++++++++-----
 .../rocketmq/replicator/ReplicatorSourceTask.java  |  3 +-
 .../config/ReplicatorConnectorConfig.java          |  1 +
 3 files changed, 52 insertions(+), 15 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 723a9c52..9d0b2142 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -30,7 +30,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,6 +65,14 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
     private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
     private KeyValue connectorConfig;
     private DefaultMQAdminExt srcMQAdminExt;
+    private List<MessageQueue> curMessageQueues = new LinkedList<>();
+    private long requestTaskReconfigIntervalMs;
+    private ScheduledExecutorService requestTaskReconfigExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "Replicator_task_reconfig");
+        }
+    });
 
     private synchronized void initAdmin() throws MQClientException {
         if (srcMQAdminExt == null) {
@@ -127,32 +141,42 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         return result;
     }
 
+    private void requestTaskReconfig() {
+        Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(
+                
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
+                
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
+        if (MapUtils.isEmpty(topicTagMap)) {
+            throw new ConnectException("Source topics & tags config cannot be 
null, please check the config info");
+        }
+        List<String> topics = new LinkedList<>(topicTagMap.keySet());
+        List<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+        if (!CollectionUtils.isEqualCollection(curMessageQueues, 
updatedMessageQueues)) {
+            connectorContext.requestTaskReconfiguration();
+        }
+    }
+
     @Override
     public List<KeyValue> taskConfigs(int maxTasks) {
         try {
             initAdmin();
         } catch (Exception e) {
             log.error("init admin client error", e);
-            throw new InitMQClientException("Replicator source connecto init 
mqAdminClient error.", e);
+            throw new InitMQClientException("Replicator source connector init 
mqAdminClient error.", e);
         }
         // normal topic
-        String topicTags = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS);
-        String srcInstanceId = 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID);
-        Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(srcInstanceId, topicTags);
-        Set<String> topics = topicTagMap.keySet();
-        if (CollectionUtils.isEmpty(topics)) {
-            throw new ConnectException("sink connector topics config can be 
null, please check sink connector config info");
-        }
-        List<String> topicList = new LinkedList<>();
-        for (String topic : topics) {
-            topicList.add(topic);
+        Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(
+                
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
+                
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
+        if (MapUtils.isEmpty(topicTagMap)) {
+            throw new ConnectException("Source topics & tags config cannot be 
null, please check the config info");
         }
+        List<String> topics = new LinkedList<>(topicTagMap.keySet());
         // todo rebalance 使用原生的;runtime & connector 都保存offset;
         // get queue
-        List<MessageQueue> messageQueues = fetchMessageQueues(topicList);
-        log.info("messageQueue : " + messageQueues.size() + " " + 
messageQueues);
+        curMessageQueues = fetchMessageQueues(topics);
+        log.info("messageQueue : " + curMessageQueues.size() + " " + 
curMessageQueues);
         // divide
-        List<List<MessageQueue>> normalDivided = divide(messageQueues, 
maxTasks);
+        List<List<MessageQueue>> normalDivided = divide(curMessageQueues, 
maxTasks);
         log.info("normalDivided : " + normalDivided + " " + normalDivided);
 
         List<KeyValue> configs = new ArrayList<>();
@@ -248,6 +272,17 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
     @Override
     public void start(KeyValue keyValue) {
         this.connectorConfig = keyValue;
+        requestTaskReconfigIntervalMs = 
connectorConfig.getLong(ReplicatorConnectorConfig.REQUEST_TASK_RECONFIG_INTERVAL_MS,
 30 * 1000);
+        requestTaskReconfigExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    requestTaskReconfig();
+                } catch (Throwable e) {
+                    log.error("Request task reconfig error", e);
+                }
+            }
+        }, requestTaskReconfigIntervalMs, requestTaskReconfigIntervalMs, 
TimeUnit.MILLISECONDS);
     }
 
     @Override
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index b6db7067..33c429ab 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -289,7 +290,7 @@ public class ReplicatorSourceTask extends SourceTask {
         for (MessageQueue mq : allQueues) {
             String topic = mq.getTopic();
             String tag = 
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
-            pullConsumer.setSubExpressionForAssign(topic, tag);
+//            pullConsumer.setSubExpressionForAssign(topic, tag);
         }
 
         try {
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2db84b69..6afdd756 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -159,6 +159,7 @@ public class ReplicatorConnectorConfig {
     public final static String SYNC_TPS = "sync.tps";
     public final static String MAX_TASK = "max.task";
     public final static String COMMIT_OFFSET_INTERVALS_MS = 
"commit.offset.interval.ms";
+    public final static String REQUEST_TASK_RECONFIG_INTERVAL_MS = 
"request.task.reconfig.ms";
 
     public String getTaskId() {
         return taskId;

Reply via email to