This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 6dabcbd5 optimize worker source task (#480)
6dabcbd5 is described below
commit 6dabcbd532fdb3fca7599333746aa58c42b939a8
Author: zhoubo <[email protected]>
AuthorDate: Thu Apr 20 15:10:16 2023 +0800
optimize worker source task (#480)
---
.../org/apache/rocketmq/replicator/ReplicatorSourceTask.java | 1 +
.../apache/rocketmq/connect/runtime/config/WorkerConfig.java | 11 ++++++++++-
.../connect/runtime/connectorwrapper/WorkerSourceTask.java | 7 +++++++
3 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 76fd862e..1cad855b 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -264,6 +264,7 @@ public class ReplicatorSourceTask extends SourceTask {
pullConsumer.setNamesrvAddr(namesrvAddr);
pullConsumer.setInstanceName(connectorConfig.generateSourceString() +
"-" + UUID.randomUUID().toString());
pullConsumer.setAutoCommit(false);
+ pullConsumer.setPullBatchSize(32);
}
private void subscribeTopicAndStartConsumer() throws MQClientException {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
index 1a0ee17e..dd586607 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
@@ -82,7 +82,8 @@ public class WorkerConfig {
* config example:
* autoCreateGroupEnable = false
*/
- private boolean autoCreateGroupEnable = false;
+ private boolean autoCreateGroupEnable = true;
+ private boolean autoCreateTopicEnable = true;
/**
* Configure cluster converter
@@ -438,6 +439,14 @@ public class WorkerConfig {
this.autoCreateGroupEnable = autoCreateGroupEnable;
}
+ public boolean isAutoCreateTopicEnable() {
+ return autoCreateTopicEnable;
+ }
+
+ public void setAutoCreateTopicEnable(boolean autoCreateTopicEnable) {
+ this.autoCreateTopicEnable = autoCreateTopicEnable;
+ }
+
public String getAdminExtGroup() {
return adminExtGroup;
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index d0219955..c0611097 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -131,6 +131,7 @@ public class WorkerSourceTask extends WorkerTask {
private DefaultMQProducer producer;
private List<ConnectRecord> toSendRecord;
private volatile RecordOffsetManagement.CommittableOffsets
committableOffsets;
+ private final Set<String> topicCache;
public WorkerSourceTask(WorkerConfig workerConfig,
ConnectorTaskId id,
@@ -164,6 +165,7 @@ public class WorkerSourceTask extends WorkerTask {
this.offsetManagement = new RecordOffsetManagement();
this.committableOffsets =
RecordOffsetManagement.CommittableOffsets.EMPTY;
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id,
connectMetrics);
+ this.topicCache = new HashSet();
}
@Nullable
@@ -432,9 +434,14 @@ public class WorkerSourceTask extends WorkerTask {
if (StringUtils.isBlank(topic)) {
throw new ConnectException("source connect lack of topic config");
}
+
+ if (!workerConfig.isAutoCreateTopicEnable() ||
topicCache.contains(topic)) {
+ return topic;
+ }
if (!ConnectUtil.isTopicExist(workerConfig, topic)) {
ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
}
+ topicCache.add(topic);
return topic;
}