This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 5340b269 [ISSUE #458] Add request reconfig for replicator (#459)
5340b269 is described below
commit 5340b269cc89ceb9b0b6f54aded3f68bfdf90820
Author: Jack Tsai <[email protected]>
AuthorDate: Thu Apr 13 16:51:55 2023 +0800
[ISSUE #458] Add request reconfig for replicator (#459)
Co-authored-by: tsaitsung-han.tht <[email protected]>
---
.../replicator/ReplicatorSourceConnector.java | 63 +++++++++++++++++-----
.../rocketmq/replicator/ReplicatorSourceTask.java | 3 +-
.../config/ReplicatorConnectorConfig.java | 1 +
3 files changed, 52 insertions(+), 15 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 723a9c52..9d0b2142 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -30,7 +30,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,6 +65,14 @@ public class ReplicatorSourceConnector extends
SourceConnector {
private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
private KeyValue connectorConfig;
private DefaultMQAdminExt srcMQAdminExt;
+ private List<MessageQueue> curMessageQueues = new LinkedList<>();
+ private long requestTaskReconfigIntervalMs;
+ private ScheduledExecutorService requestTaskReconfigExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Replicator_task_reconfig");
+ }
+ });
private synchronized void initAdmin() throws MQClientException {
if (srcMQAdminExt == null) {
@@ -127,32 +141,42 @@ public class ReplicatorSourceConnector extends
SourceConnector {
return result;
}
+ private void requestTaskReconfig() {
+ Map<String, String> topicTagMap =
ReplicatorConnectorConfig.getSrcTopicTagMap(
+
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
+
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
+ if (MapUtils.isEmpty(topicTagMap)) {
+ throw new ConnectException("Source topics & tags config cannot be
null, please check the config info");
+ }
+ List<String> topics = new LinkedList<>(topicTagMap.keySet());
+ List<MessageQueue> updatedMessageQueues = fetchMessageQueues(topics);
+ if (!CollectionUtils.isEqualCollection(curMessageQueues,
updatedMessageQueues)) {
+ connectorContext.requestTaskReconfiguration();
+ }
+ }
+
@Override
public List<KeyValue> taskConfigs(int maxTasks) {
try {
initAdmin();
} catch (Exception e) {
log.error("init admin client error", e);
- throw new InitMQClientException("Replicator source connecto init
mqAdminClient error.", e);
+ throw new InitMQClientException("Replicator source connector init
mqAdminClient error.", e);
}
// normal topic
- String topicTags =
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS);
- String srcInstanceId =
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID);
- Map<String, String> topicTagMap =
ReplicatorConnectorConfig.getSrcTopicTagMap(srcInstanceId, topicTags);
- Set<String> topics = topicTagMap.keySet();
- if (CollectionUtils.isEmpty(topics)) {
- throw new ConnectException("sink connector topics config can be
null, please check sink connector config info");
- }
- List<String> topicList = new LinkedList<>();
- for (String topic : topics) {
- topicList.add(topic);
+ Map<String, String> topicTagMap =
ReplicatorConnectorConfig.getSrcTopicTagMap(
+
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
+
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
+ if (MapUtils.isEmpty(topicTagMap)) {
+ throw new ConnectException("Source topics & tags config cannot be
null, please check the config info");
}
+ List<String> topics = new LinkedList<>(topicTagMap.keySet());
// todo rebalance 使用原生的;runtime & connector 都保存offset;
// get queue
- List<MessageQueue> messageQueues = fetchMessageQueues(topicList);
- log.info("messageQueue : " + messageQueues.size() + " " +
messageQueues);
+ curMessageQueues = fetchMessageQueues(topics);
+ log.info("messageQueue : " + curMessageQueues.size() + " " +
curMessageQueues);
// divide
- List<List<MessageQueue>> normalDivided = divide(messageQueues,
maxTasks);
+ List<List<MessageQueue>> normalDivided = divide(curMessageQueues,
maxTasks);
log.info("normalDivided : " + normalDivided + " " + normalDivided);
List<KeyValue> configs = new ArrayList<>();
@@ -248,6 +272,17 @@ public class ReplicatorSourceConnector extends
SourceConnector {
@Override
public void start(KeyValue keyValue) {
this.connectorConfig = keyValue;
+ requestTaskReconfigIntervalMs =
connectorConfig.getLong(ReplicatorConnectorConfig.REQUEST_TASK_RECONFIG_INTERVAL_MS,
30 * 1000);
+ requestTaskReconfigExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ requestTaskReconfig();
+ } catch (Throwable e) {
+ log.error("Request task reconfig error", e);
+ }
+ }
+ }, requestTaskReconfigIntervalMs, requestTaskReconfigIntervalMs,
TimeUnit.MILLISECONDS);
}
@Override
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index b6db7067..33c429ab 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -289,7 +290,7 @@ public class ReplicatorSourceTask extends SourceTask {
for (MessageQueue mq : allQueues) {
String topic = mq.getTopic();
String tag =
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(),
connectorConfig.getSrcTopicTags()).get(topic);
- pullConsumer.setSubExpressionForAssign(topic, tag);
+// pullConsumer.setSubExpressionForAssign(topic, tag);
}
try {
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2db84b69..6afdd756 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -159,6 +159,7 @@ public class ReplicatorConnectorConfig {
public final static String SYNC_TPS = "sync.tps";
public final static String MAX_TASK = "max.task";
public final static String COMMIT_OFFSET_INTERVALS_MS =
"commit.offset.interval.ms";
+ public final static String REQUEST_TASK_RECONFIG_INTERVAL_MS =
"request.task.reconfig.ms";
public String getTaskId() {
return taskId;