This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dabcbd5 optimize worker source task (#480)
6dabcbd5 is described below

commit 6dabcbd532fdb3fca7599333746aa58c42b939a8
Author: zhoubo <[email protected]>
AuthorDate: Thu Apr 20 15:10:16 2023 +0800

    optimize worker source task (#480)
---
 .../org/apache/rocketmq/replicator/ReplicatorSourceTask.java  |  1 +
 .../apache/rocketmq/connect/runtime/config/WorkerConfig.java  | 11 ++++++++++-
 .../connect/runtime/connectorwrapper/WorkerSourceTask.java    |  7 +++++++
 3 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 76fd862e..1cad855b 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -264,6 +264,7 @@ public class ReplicatorSourceTask extends SourceTask {
         pullConsumer.setNamesrvAddr(namesrvAddr);
         pullConsumer.setInstanceName(connectorConfig.generateSourceString() + 
"-" + UUID.randomUUID().toString());
         pullConsumer.setAutoCommit(false);
+        pullConsumer.setPullBatchSize(32);
     }
 
     private void subscribeTopicAndStartConsumer() throws MQClientException {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
index 1a0ee17e..dd586607 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
@@ -82,7 +82,8 @@ public class WorkerConfig {
      * config example:
      * autoCreateGroupEnable = false
      */
-    private boolean autoCreateGroupEnable = false;
+    private boolean autoCreateGroupEnable = true;
+    private boolean autoCreateTopicEnable = true;
 
     /**
      * Configure cluster converter
@@ -438,6 +439,14 @@ public class WorkerConfig {
         this.autoCreateGroupEnable = autoCreateGroupEnable;
     }
 
+    public boolean isAutoCreateTopicEnable() {
+        return autoCreateTopicEnable;
+    }
+
+    public void setAutoCreateTopicEnable(boolean autoCreateTopicEnable) {
+        this.autoCreateTopicEnable = autoCreateTopicEnable;
+    }
+
     public String getAdminExtGroup() {
         return adminExtGroup;
     }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index d0219955..c0611097 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -131,6 +131,7 @@ public class WorkerSourceTask extends WorkerTask {
     private DefaultMQProducer producer;
     private List<ConnectRecord> toSendRecord;
     private volatile RecordOffsetManagement.CommittableOffsets 
committableOffsets;
+    private final Set<String> topicCache;
 
     public WorkerSourceTask(WorkerConfig workerConfig,
                             ConnectorTaskId id,
@@ -164,6 +165,7 @@ public class WorkerSourceTask extends WorkerTask {
         this.offsetManagement = new RecordOffsetManagement();
         this.committableOffsets = 
RecordOffsetManagement.CommittableOffsets.EMPTY;
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
+        this.topicCache = new HashSet();
     }
 
     @Nullable
@@ -432,9 +434,14 @@ public class WorkerSourceTask extends WorkerTask {
         if (StringUtils.isBlank(topic)) {
             throw new ConnectException("source connect lack of topic config");
         }
+
+        if (!workerConfig.isAutoCreateTopicEnable() || 
topicCache.contains(topic)) {
+            return topic;
+        }
         if (!ConnectUtil.isTopicExist(workerConfig, topic)) {
             ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
         }
+        topicCache.add(topic);
         return topic;
     }
 

Reply via email to