This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d47e56 [ISSUE #466] Make srcTopics and srcTopicTags to one parameter 
(#467)
47d47e56 is described below

commit 47d47e56f46b3d66cf2e0feeb50a91b21c97a2de
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 15:51:53 2023 +0800

    [ISSUE #466] Make srcTopics and srcTopicTags to one parameter (#467)
---
 .../replicator/ReplicatorCheckpointConnector.java  |  4 ++--
 .../replicator/ReplicatorCheckpointTask.java       |  4 ++--
 .../config/ReplicatorConnectorConfig.java          | 24 ++--------------------
 3 files changed, 6 insertions(+), 26 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
index e866da60..3849f645 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -64,7 +64,7 @@ public class ReplicatorCheckpointConnector extends 
SourceConnector {
         keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, 
config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
         keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, 
config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
         keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, 
config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
-        keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS, 
config.getString(ReplicatorConnectorConfig.SRC_TOPICS));
+        keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, 
config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
         keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, 
config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
         keyValue.put(ReplicatorConnectorConfig.DEST_REGION, 
config.getString(ReplicatorConnectorConfig.DEST_REGION));
         keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, 
config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
@@ -94,7 +94,7 @@ public class ReplicatorCheckpointConnector extends 
SourceConnector {
             add(ReplicatorConnectorConfig.SRC_REGION);
             add(ReplicatorConnectorConfig.SRC_CLUSTER);
             add(ReplicatorConnectorConfig.SRC_ENDPOINT);
-            add(ReplicatorConnectorConfig.SRC_TOPICS);
+            add(ReplicatorConnectorConfig.SRC_TOPICTAGS);
             add(ReplicatorConnectorConfig.DEST_CLOUD);
             add(ReplicatorConnectorConfig.DEST_REGION);
             add(ReplicatorConnectorConfig.DEST_CLUSTER);
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 8eba84a0..6c9946d2 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -144,7 +144,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
             lastCheckPointTimestamp = System.currentTimeMillis();
             return null;
         }
-        Set<String> srcTopics = 
connectorConfig.getSrcTopics(connectorConfig.getSrcTopics());
+        Set<String> srcTopics = 
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).keySet();
         try {
             String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
             for (String consumerGroup : syncGidArr) {
@@ -258,7 +258,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
         
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
         
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
         
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
-        
connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS));
+        
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags()));
         
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
         
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
         
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2713a165..2db84b69 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -43,7 +43,6 @@ public class ReplicatorConnectorConfig {
     private String srcCluster;
     private String srcInstanceId;
     private String srcTopicTags; // format 
topic-1,tag-a;topic-2,tag-b;topic-3,tag-c
-    private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c
     private String srcEndpoint;
     private boolean srcAclEnable;
     private boolean autoCreateInnerConsumergroup;
@@ -66,7 +65,7 @@ public class ReplicatorConnectorConfig {
     private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
     // consume from timestamp
     private long consumeFromTimestamp = System.currentTimeMillis();
-    // sourcetask replicate to mq failover strategy
+    // source task replicate to mq failover strategy
     private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS;
     private boolean enableHeartbeat = true;
     private boolean enableCheckpoint = true;
@@ -90,7 +89,7 @@ public class ReplicatorConnectorConfig {
     private int syncTps = 1000;
     private int maxTask = 2;
     //
-    private int heartbeatIntervalMs = 1 * 1000;
+    private int heartbeatIntervalMs = 1000;
     private int checkpointIntervalMs = 10 * 1000;
     private long commitOffsetIntervalMs = 10 * 1000;
     private String heartbeatTopic;
@@ -120,8 +119,6 @@ public class ReplicatorConnectorConfig {
     public final static String SRC_CLUSTER = "src.cluster";
     public final static String SRC_INSTANCEID = "src.instanceid";
     public final static String SRC_TOPICTAGS = "src.topictags";
-
-    public final static String SRC_TOPICS = "src.topics";
     public final static String SRC_ENDPOINT = "src.endpoint";
     public final static String SRC_ACL_ENABLE = "src.acl.enable";
     public final static String SRC_ACCESS_KEY = "src.access.key";
@@ -242,15 +239,6 @@ public class ReplicatorConnectorConfig {
         return topicTagMap;
     }
 
-
-    public static Set<String> getSrcTopics(String srcTopics) {
-        if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) {
-            return null;
-        }
-        List<String> topicList = 
Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics);
-        return new HashSet(topicList);
-    }
-
     public String getSrcTopicTags() {
         return srcTopicTags;
     }
@@ -259,14 +247,6 @@ public class ReplicatorConnectorConfig {
         this.srcTopicTags = srcTopicTags;
     }
 
-    public String getSrcTopics() {
-        return srcTopics;
-    }
-
-    public void setSrcTopics(String srcTopics) {
-        this.srcTopics = srcTopics;
-    }
-
     public String getSrcEndpoint() {
         return srcEndpoint;
     }

Reply via email to