This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 91237bb0 [ISSUE #470] Before creating, ensure the existence of a 
checkpoint topic. (#471)
91237bb0 is described below

commit 91237bb01a1005f3bfc06922c18c76d35ffca064
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 17:05:48 2023 +0800

    [ISSUE #470] Before creating, ensure the existence of a checkpoint topic. 
(#471)
---
 .../replicator/ReplicatorCheckpointConnector.java     | 19 ++++++++++++++-----
 .../replicator/ReplicatorHeartbeatConnector.java      |  1 -
 .../replicator/ReplicatorSourceConnector.java         |  1 -
 .../rocketmq/replicator/ReplicatorSourceTask.java     |  3 +--
 4 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
index 3849f645..2b7f7772 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -99,7 +100,6 @@ public class ReplicatorCheckpointConnector extends 
SourceConnector {
             add(ReplicatorConnectorConfig.DEST_REGION);
             add(ReplicatorConnectorConfig.DEST_CLUSTER);
             add(ReplicatorConnectorConfig.DEST_ENDPOINT);
-            add(ReplicatorConnectorConfig.SRC_CLOUD);
             add(ReplicatorConnectorConfig.SYNC_GIDS);
             add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
             add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
@@ -122,7 +122,7 @@ public class ReplicatorCheckpointConnector extends 
SourceConnector {
         this.config = keyValue;
         try {
             buildAndStartTargetMQAdmin();
-            createCheckpointTopic();
+            createCheckpointTopicIfNotExist();
         } catch (MQClientException e) {
             throw new InitMQClientException("Replicator checkpoint connector 
init mqAdminClient error.", e);
         }
@@ -164,14 +164,23 @@ public class ReplicatorCheckpointConnector extends 
SourceConnector {
         }
     }
 
-    private void createCheckpointTopic() {
+    private void createCheckpointTopicIfNotExist() {
+        String checkpointTopic = 
config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC, 
ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC);
+        TopicRouteData topicRouteData = null;
+        try {
+            topicRouteData = 
targetMqAdminExt.examineTopicRouteInfo(checkpointTopic);
+        } catch (Exception ignored) {
+        }
+        if (topicRouteData != null && 
!topicRouteData.getQueueDatas().isEmpty()) {
+            return;
+        }
         //create target checkpoint topic, todo compact topic
         TopicConfig topicConfig = new TopicConfig();
         topicConfig.setReadQueueNums(8);
         topicConfig.setWriteQueueNums(8);
-        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | 
PermName.PERM_WRITE;
+        int perm = PermName.PERM_READ | PermName.PERM_WRITE;
         topicConfig.setPerm(perm);
-        
topicConfig.setTopicName(config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC,
 ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+        topicConfig.setTopicName(checkpointTopic);
         ReplicatorUtils.createTopic(targetMqAdminExt, topicConfig);
     }
 }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
index 2f765840..ad54f084 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
@@ -71,7 +71,6 @@ public class ReplicatorHeartbeatConnector extends 
SourceConnector {
             add(ReplicatorConnectorConfig.DEST_INSTANCEID);
             add(ReplicatorConnectorConfig.DEST_ENDPOINT);
             add(ReplicatorConnectorConfig.DEST_TOPIC);
-            add(ReplicatorConnectorConfig.SRC_CLOUD);
             add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
             add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
         }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 0e9f1f96..ef73f4cb 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -225,7 +225,6 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             add(ReplicatorConnectorConfig.DEST_CLUSTER);
             add(ReplicatorConnectorConfig.DEST_ENDPOINT);
             add(ReplicatorConnectorConfig.DEST_TOPIC);
-            add(ReplicatorConnectorConfig.SRC_CLOUD);
             add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
             add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
             add(ERRORS_TOLERANCE_CONFIG);
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 74884545..418477d2 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -269,9 +269,8 @@ public class ReplicatorSourceTask extends SourceTask {
         ConsumeFromWhere consumeFromWhere = 
connectorConfig.getConsumeFromWhere();
         
pullConsumer.setConsumeFromWhere(org.apache.rocketmq.common.consumer.ConsumeFromWhere.valueOf(consumeFromWhere.name()));
         log.info("litePullConsumer use " + consumeFromWhere.name());
-        long consumeFromTimestamp = System.currentTimeMillis();
         if (consumeFromWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
-            consumeFromTimestamp = connectorConfig.getConsumeFromTimestamp();
+            long consumeFromTimestamp = 
connectorConfig.getConsumeFromTimestamp();
             String timestamp = 
UtilAll.timeMillisToHumanString3(consumeFromTimestamp);
             pullConsumer.setConsumeTimestamp(timestamp);
             log.info("litePullConsumer consume start at " + timestamp);

Reply via email to