This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 47d47e56 [ISSUE #466] Make srcTopics and srcTopicTags to one parameter
(#467)
47d47e56 is described below
commit 47d47e56f46b3d66cf2e0feeb50a91b21c97a2de
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 15:51:53 2023 +0800
[ISSUE #466] Make srcTopics and srcTopicTags to one parameter (#467)
---
.../replicator/ReplicatorCheckpointConnector.java | 4 ++--
.../replicator/ReplicatorCheckpointTask.java | 4 ++--
.../config/ReplicatorConnectorConfig.java | 24 ++--------------------
3 files changed, 6 insertions(+), 26 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
index e866da60..3849f645 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -64,7 +64,7 @@ public class ReplicatorCheckpointConnector extends
SourceConnector {
keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY,
config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY,
config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
- keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS,
config.getString(ReplicatorConnectorConfig.SRC_TOPICS));
+ keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS,
config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD,
config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
keyValue.put(ReplicatorConnectorConfig.DEST_REGION,
config.getString(ReplicatorConnectorConfig.DEST_REGION));
keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER,
config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
@@ -94,7 +94,7 @@ public class ReplicatorCheckpointConnector extends
SourceConnector {
add(ReplicatorConnectorConfig.SRC_REGION);
add(ReplicatorConnectorConfig.SRC_CLUSTER);
add(ReplicatorConnectorConfig.SRC_ENDPOINT);
- add(ReplicatorConnectorConfig.SRC_TOPICS);
+ add(ReplicatorConnectorConfig.SRC_TOPICTAGS);
add(ReplicatorConnectorConfig.DEST_CLOUD);
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 8eba84a0..6c9946d2 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -144,7 +144,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
lastCheckPointTimestamp = System.currentTimeMillis();
return null;
}
- Set<String> srcTopics =
connectorConfig.getSrcTopics(connectorConfig.getSrcTopics());
+ Set<String> srcTopics =
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(),
connectorConfig.getSrcTopicTags()).keySet();
try {
String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
for (String consumerGroup : syncGidArr) {
@@ -258,7 +258,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
-
connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS));
+
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags()));
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2713a165..2db84b69 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -43,7 +43,6 @@ public class ReplicatorConnectorConfig {
private String srcCluster;
private String srcInstanceId;
private String srcTopicTags; // format
topic-1,tag-a;topic-2,tag-b;topic-3,tag-c
- private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c
private String srcEndpoint;
private boolean srcAclEnable;
private boolean autoCreateInnerConsumergroup;
@@ -66,7 +65,7 @@ public class ReplicatorConnectorConfig {
private ConsumeFromWhere consumeFromWhere =
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// consume from timestamp
private long consumeFromTimestamp = System.currentTimeMillis();
- // sourcetask replicate to mq failover strategy
+ // source task replicate to mq failover strategy
private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS;
private boolean enableHeartbeat = true;
private boolean enableCheckpoint = true;
@@ -90,7 +89,7 @@ public class ReplicatorConnectorConfig {
private int syncTps = 1000;
private int maxTask = 2;
//
- private int heartbeatIntervalMs = 1 * 1000;
+ private int heartbeatIntervalMs = 1000;
private int checkpointIntervalMs = 10 * 1000;
private long commitOffsetIntervalMs = 10 * 1000;
private String heartbeatTopic;
@@ -120,8 +119,6 @@ public class ReplicatorConnectorConfig {
public final static String SRC_CLUSTER = "src.cluster";
public final static String SRC_INSTANCEID = "src.instanceid";
public final static String SRC_TOPICTAGS = "src.topictags";
-
- public final static String SRC_TOPICS = "src.topics";
public final static String SRC_ENDPOINT = "src.endpoint";
public final static String SRC_ACL_ENABLE = "src.acl.enable";
public final static String SRC_ACCESS_KEY = "src.access.key";
@@ -242,15 +239,6 @@ public class ReplicatorConnectorConfig {
return topicTagMap;
}
-
- public static Set<String> getSrcTopics(String srcTopics) {
- if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) {
- return null;
- }
- List<String> topicList =
Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics);
- return new HashSet(topicList);
- }
-
public String getSrcTopicTags() {
return srcTopicTags;
}
@@ -259,14 +247,6 @@ public class ReplicatorConnectorConfig {
this.srcTopicTags = srcTopicTags;
}
- public String getSrcTopics() {
- return srcTopics;
- }
-
- public void setSrcTopics(String srcTopics) {
- this.srcTopics = srcTopics;
- }
-
public String getSrcEndpoint() {
return srcEndpoint;
}