This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 0043613a [ISSUES #481 ] Support requestTaskReconfiguration (#482)
0043613a is described below

commit 0043613af62af2a155d3cc5cc451c88ab0668c9b
Author: zhoubo <[email protected]>
AuthorDate: Fri Apr 21 10:39:23 2023 +0800

    [ISSUES #481 ] Support requestTaskReconfiguration (#482)
    
    * worker source task support retaskConfigs
    optimize replicator task num calculate
    
    * modify requestTaskReconfigIntervalMs default check interval
    
    * fix worker testcase
---
 .../replicator/ReplicatorSourceConnector.java      | 46 ++++++++++++++--------
 .../rocketmq/replicator/ReplicatorSourceTask.java  |  1 -
 .../connect/runtime/connectorwrapper/Worker.java   | 12 +-----
 .../runtime/connectorwrapper/WorkerTest.java       | 10 -----
 4 files changed, 32 insertions(+), 37 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 9d0b2142..04b4ca8a 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -65,7 +65,7 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
     private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
     private KeyValue connectorConfig;
     private DefaultMQAdminExt srcMQAdminExt;
-    private List<MessageQueue> curMessageQueues = new LinkedList<>();
+    private volatile Set<MessageQueue> curMessageQueues = new HashSet<>();
     private long requestTaskReconfigIntervalMs;
     private ScheduledExecutorService requestTaskReconfigExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
@@ -100,8 +100,14 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         }
     }
 
-    private List<MessageQueue> fetchMessageQueues(List<String> topicList) {
-        List<MessageQueue> messageQueues = new LinkedList<>();
+    private synchronized void closeScheduleTask() {
+        if (requestTaskReconfigExecutorService != null) {
+            requestTaskReconfigExecutorService.shutdown();
+        }
+    }
+
+    private Set<MessageQueue> fetchMessageQueues(List<String> topicList) {
+        Set<MessageQueue> messageQueues = new HashSet<>();
         try {
             for (String topic : topicList) {
                 TopicRouteData topicRouteData = 
srcMQAdminExt.examineTopicRouteInfo(topic);
@@ -141,7 +147,7 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         return result;
     }
 
-    private void requestTaskReconfig() {
+    private void checkAndRequestTaskReconfig() {
         Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(
                 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
                 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
@@ -149,20 +155,20 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             throw new ConnectException("Source topics & tags config cannot be 
null, please check the config info");
         }
         List<String> topics = new LinkedList<>(topicTagMap.keySet());
-        List<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+        Set<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+
+        if (CollectionUtils.isEmpty(curMessageQueues)) {
+            curMessageQueues = updatedMessageQueues;
+            return;
+        }
         if (!CollectionUtils.isEqualCollection(curMessageQueues, 
updatedMessageQueues)) {
+            curMessageQueues = updatedMessageQueues;
             connectorContext.requestTaskReconfiguration();
         }
     }
 
     @Override
     public List<KeyValue> taskConfigs(int maxTasks) {
-        try {
-            initAdmin();
-        } catch (Exception e) {
-            log.error("init admin client error", e);
-            throw new InitMQClientException("Replicator source connector init 
mqAdminClient error.", e);
-        }
         // normal topic
         Map<String, String> topicTagMap = 
ReplicatorConnectorConfig.getSrcTopicTagMap(
                 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
@@ -174,13 +180,15 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         // todo rebalance 使用原生的;runtime & connector 都保存offset;
         // get queue
         curMessageQueues = fetchMessageQueues(topics);
+        int taskNum;
+        taskNum = Math.min(curMessageQueues.size(), maxTasks);
         log.info("messageQueue : " + curMessageQueues.size() + " " + 
curMessageQueues);
         // divide
-        List<List<MessageQueue>> normalDivided = divide(curMessageQueues, 
maxTasks);
+        List<List<MessageQueue>> normalDivided = divide(new 
ArrayList<>(curMessageQueues), taskNum);
         log.info("normalDivided : " + normalDivided + " " + normalDivided);
 
         List<KeyValue> configs = new ArrayList<>();
-        for (int i = 0; i < maxTasks; i++) {
+        for (int i = 0; i < taskNum; i++) {
             KeyValue keyValue = new DefaultKeyValue();
             keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, 
JSON.toJSONString(normalDivided.get(i)));
 
@@ -223,7 +231,6 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
                 return 
buildCompareString(o1).compareTo(buildCompareString(o2));
             }
         });
-        closeAdmin();
         return configs;
     }
 
@@ -272,22 +279,29 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
     @Override
     public void start(KeyValue keyValue) {
         this.connectorConfig = keyValue;
+        try {
+            initAdmin();
+        } catch (Exception e) {
+            log.error("init admin client error", e);
+            throw new InitMQClientException("Replicator source connector init 
mqAdminClient error.", e);
+        }
         requestTaskReconfigIntervalMs = 
connectorConfig.getLong(ReplicatorConnectorConfig.REQUEST_TASK_RECONFIG_INTERVAL_MS,
 30 * 1000);
         requestTaskReconfigExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
-                    requestTaskReconfig();
+                    checkAndRequestTaskReconfig();
                 } catch (Throwable e) {
                     log.error("Request task reconfig error", e);
                 }
             }
-        }, requestTaskReconfigIntervalMs, requestTaskReconfigIntervalMs, 
TimeUnit.MILLISECONDS);
+        }, 60 * 1000, requestTaskReconfigIntervalMs, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void stop() {
         closeAdmin();
+        closeScheduleTask();
     }
 
 }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 1cad855b..af9352a8 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index f6ff52cf..496e0276 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -105,10 +105,7 @@ public class Worker {
     private final ConfigManagementService configManagementService;
     private final ConnectMetrics connectMetrics;
     Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
-    /**
-     * Current running connectors.
-     */
-    private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();
+
     /**
      * Current tasks state.
      */
@@ -516,12 +513,7 @@ public class Worker {
 
 
     public Set<WorkerConnector> getWorkingConnectors() {
-        return workingConnectors;
-    }
-
-    public void setWorkingConnectors(
-            Set<WorkerConnector> workingConnectors) {
-        this.workingConnectors = workingConnectors;
+        return new HashSet<>(connectors.values());
     }
 
     /**
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 5570be21..9b7e8a4d 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -134,16 +134,6 @@ public class WorkerTest {
         stateManagementService = new LocalStateManagementServiceImpl();
         stateManagementService.initialize(connectConfig, new TestConverter());
         worker = new Worker(connectConfig, positionManagementService, 
configManagementService, plugin, connectController, stateManagementService);
-
-        Set<WorkerConnector> workingConnectors = new HashSet<>();
-        for (int i = 0; i < 3; i++) {
-            ConnectKeyValue connectKeyValue = new ConnectKeyValue();
-            connectKeyValue.getProperties().put("key1", "TEST-CONN-" + i + 
"1");
-            connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i + 
"2");
-            workingConnectors.add(new WorkerConnector("TEST-CONN-" + i, new 
TestConnector(), connectKeyValue, connectorContext, null, null));
-        }
-        worker.setWorkingConnectors(workingConnectors);
-        assertThat(worker.getWorkingConnectors().size()).isEqualTo(3);
         TransformChain<ConnectRecord> transformChain = new 
TransformChain<ConnectRecord>(new DefaultKeyValue(), plugin);
         Set<Runnable> runnables = new HashSet<>();
 

Reply via email to