This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 0043613a [ISSUES #481 ] Support requestTaskReconfiguration (#482)
0043613a is described below
commit 0043613af62af2a155d3cc5cc451c88ab0668c9b
Author: zhoubo <[email protected]>
AuthorDate: Fri Apr 21 10:39:23 2023 +0800
[ISSUES #481 ] Support requestTaskReconfiguration (#482)
* worker source task support retaskConfigs
optimize replicator task num calculate
* modify requestTaskReconfigIntervalMs default check interval
* fix worker testcase
---
.../replicator/ReplicatorSourceConnector.java | 46 ++++++++++++++--------
.../rocketmq/replicator/ReplicatorSourceTask.java | 1 -
.../connect/runtime/connectorwrapper/Worker.java | 12 +-----
.../runtime/connectorwrapper/WorkerTest.java | 10 -----
4 files changed, 32 insertions(+), 37 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 9d0b2142..04b4ca8a 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -65,7 +65,7 @@ public class ReplicatorSourceConnector extends
SourceConnector {
private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
private KeyValue connectorConfig;
private DefaultMQAdminExt srcMQAdminExt;
- private List<MessageQueue> curMessageQueues = new LinkedList<>();
+ private volatile Set<MessageQueue> curMessageQueues = new HashSet<>();
private long requestTaskReconfigIntervalMs;
private ScheduledExecutorService requestTaskReconfigExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
@@ -100,8 +100,14 @@ public class ReplicatorSourceConnector extends
SourceConnector {
}
}
- private List<MessageQueue> fetchMessageQueues(List<String> topicList) {
- List<MessageQueue> messageQueues = new LinkedList<>();
+ private synchronized void closeScheduleTask() {
+ if (requestTaskReconfigExecutorService != null) {
+ requestTaskReconfigExecutorService.shutdown();
+ }
+ }
+
+ private Set<MessageQueue> fetchMessageQueues(List<String> topicList) {
+ Set<MessageQueue> messageQueues = new HashSet<>();
try {
for (String topic : topicList) {
TopicRouteData topicRouteData =
srcMQAdminExt.examineTopicRouteInfo(topic);
@@ -141,7 +147,7 @@ public class ReplicatorSourceConnector extends
SourceConnector {
return result;
}
- private void requestTaskReconfig() {
+ private void checkAndRequestTaskReconfig() {
Map<String, String> topicTagMap =
ReplicatorConnectorConfig.getSrcTopicTagMap(
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
@@ -149,20 +155,20 @@ public class ReplicatorSourceConnector extends
SourceConnector {
throw new ConnectException("Source topics & tags config cannot be
null, please check the config info");
}
List<String> topics = new LinkedList<>(topicTagMap.keySet());
- List<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+ Set<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+
+ if (CollectionUtils.isEmpty(curMessageQueues)) {
+ curMessageQueues = updatedMessageQueues;
+ return;
+ }
if (!CollectionUtils.isEqualCollection(curMessageQueues,
updatedMessageQueues)) {
+ curMessageQueues = updatedMessageQueues;
connectorContext.requestTaskReconfiguration();
}
}
@Override
public List<KeyValue> taskConfigs(int maxTasks) {
- try {
- initAdmin();
- } catch (Exception e) {
- log.error("init admin client error", e);
- throw new InitMQClientException("Replicator source connector init
mqAdminClient error.", e);
- }
// normal topic
Map<String, String> topicTagMap =
ReplicatorConnectorConfig.getSrcTopicTagMap(
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
@@ -174,13 +180,15 @@ public class ReplicatorSourceConnector extends
SourceConnector {
// todo rebalance 使用原生的;runtime & connector 都保存offset;
// get queue
curMessageQueues = fetchMessageQueues(topics);
+ int taskNum;
+ taskNum = Math.min(curMessageQueues.size(), maxTasks);
log.info("messageQueue : " + curMessageQueues.size() + " " +
curMessageQueues);
// divide
- List<List<MessageQueue>> normalDivided = divide(curMessageQueues,
maxTasks);
+ List<List<MessageQueue>> normalDivided = divide(new
ArrayList<>(curMessageQueues), taskNum);
log.info("normalDivided : " + normalDivided + " " + normalDivided);
List<KeyValue> configs = new ArrayList<>();
- for (int i = 0; i < maxTasks; i++) {
+ for (int i = 0; i < taskNum; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES,
JSON.toJSONString(normalDivided.get(i)));
@@ -223,7 +231,6 @@ public class ReplicatorSourceConnector extends
SourceConnector {
return
buildCompareString(o1).compareTo(buildCompareString(o2));
}
});
- closeAdmin();
return configs;
}
@@ -272,22 +279,29 @@ public class ReplicatorSourceConnector extends
SourceConnector {
@Override
public void start(KeyValue keyValue) {
this.connectorConfig = keyValue;
+ try {
+ initAdmin();
+ } catch (Exception e) {
+ log.error("init admin client error", e);
+ throw new InitMQClientException("Replicator source connector init
mqAdminClient error.", e);
+ }
requestTaskReconfigIntervalMs =
connectorConfig.getLong(ReplicatorConnectorConfig.REQUEST_TASK_RECONFIG_INTERVAL_MS,
30 * 1000);
requestTaskReconfigExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
- requestTaskReconfig();
+ checkAndRequestTaskReconfig();
} catch (Throwable e) {
log.error("Request task reconfig error", e);
}
}
- }, requestTaskReconfigIntervalMs, requestTaskReconfigIntervalMs,
TimeUnit.MILLISECONDS);
+ }, 60 * 1000, requestTaskReconfigIntervalMs, TimeUnit.MILLISECONDS);
}
@Override
public void stop() {
closeAdmin();
+ closeScheduleTask();
}
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 1cad855b..af9352a8 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index f6ff52cf..496e0276 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -105,10 +105,7 @@ public class Worker {
private final ConfigManagementService configManagementService;
private final ConnectMetrics connectMetrics;
Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
- /**
- * Current running connectors.
- */
- private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();
+
/**
* Current tasks state.
*/
@@ -516,12 +513,7 @@ public class Worker {
public Set<WorkerConnector> getWorkingConnectors() {
- return workingConnectors;
- }
-
- public void setWorkingConnectors(
- Set<WorkerConnector> workingConnectors) {
- this.workingConnectors = workingConnectors;
+ return new HashSet<>(connectors.values());
}
/**
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 5570be21..9b7e8a4d 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -134,16 +134,6 @@ public class WorkerTest {
stateManagementService = new LocalStateManagementServiceImpl();
stateManagementService.initialize(connectConfig, new TestConverter());
worker = new Worker(connectConfig, positionManagementService,
configManagementService, plugin, connectController, stateManagementService);
-
- Set<WorkerConnector> workingConnectors = new HashSet<>();
- for (int i = 0; i < 3; i++) {
- ConnectKeyValue connectKeyValue = new ConnectKeyValue();
- connectKeyValue.getProperties().put("key1", "TEST-CONN-" + i +
"1");
- connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i +
"2");
- workingConnectors.add(new WorkerConnector("TEST-CONN-" + i, new
TestConnector(), connectKeyValue, connectorContext, null, null));
- }
- worker.setWorkingConnectors(workingConnectors);
- assertThat(worker.getWorkingConnectors().size()).isEqualTo(3);
TransformChain<ConnectRecord> transformChain = new
TransformChain<ConnectRecord>(new DefaultKeyValue(), plugin);
Set<Runnable> runnables = new HashSet<>();