This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 91237bb0 [ISSUE #470] Before creating, ensure the existence of a
checkpoint topic. (#471)
91237bb0 is described below
commit 91237bb01a1005f3bfc06922c18c76d35ffca064
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 17:05:48 2023 +0800
[ISSUE #470] Before creating, ensure the existence of a checkpoint topic.
(#471)
---
.../replicator/ReplicatorCheckpointConnector.java | 19 ++++++++++++++-----
.../replicator/ReplicatorHeartbeatConnector.java | 1 -
.../replicator/ReplicatorSourceConnector.java | 1 -
.../rocketmq/replicator/ReplicatorSourceTask.java | 3 +--
4 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
index 3849f645..2b7f7772 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.remoting.RPCHook;
@@ -99,7 +100,6 @@ public class ReplicatorCheckpointConnector extends
SourceConnector {
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
- add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SYNC_GIDS);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
@@ -122,7 +122,7 @@ public class ReplicatorCheckpointConnector extends
SourceConnector {
this.config = keyValue;
try {
buildAndStartTargetMQAdmin();
- createCheckpointTopic();
+ createCheckpointTopicIfNotExist();
} catch (MQClientException e) {
throw new InitMQClientException("Replicator checkpoint connector
init mqAdminClient error.", e);
}
@@ -164,14 +164,23 @@ public class ReplicatorCheckpointConnector extends
SourceConnector {
}
}
- private void createCheckpointTopic() {
+ private void createCheckpointTopicIfNotExist() {
+ String checkpointTopic =
config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC,
ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC);
+ TopicRouteData topicRouteData = null;
+ try {
+ topicRouteData =
targetMqAdminExt.examineTopicRouteInfo(checkpointTopic);
+ } catch (Exception ignored) {
+ }
+ if (topicRouteData != null &&
!topicRouteData.getQueueDatas().isEmpty()) {
+ return;
+ }
//create target checkpoint topic, todo compact topic
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
- int perm = PermName.PERM_INHERIT | PermName.PERM_READ |
PermName.PERM_WRITE;
+ int perm = PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
-
topicConfig.setTopicName(config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC,
ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+ topicConfig.setTopicName(checkpointTopic);
ReplicatorUtils.createTopic(targetMqAdminExt, topicConfig);
}
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
index 2f765840..ad54f084 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
@@ -71,7 +71,6 @@ public class ReplicatorHeartbeatConnector extends
SourceConnector {
add(ReplicatorConnectorConfig.DEST_INSTANCEID);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.DEST_TOPIC);
- add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 0e9f1f96..ef73f4cb 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -225,7 +225,6 @@ public class ReplicatorSourceConnector extends
SourceConnector {
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.DEST_TOPIC);
- add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
add(ERRORS_TOLERANCE_CONFIG);
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 74884545..418477d2 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -269,9 +269,8 @@ public class ReplicatorSourceTask extends SourceTask {
ConsumeFromWhere consumeFromWhere =
connectorConfig.getConsumeFromWhere();
pullConsumer.setConsumeFromWhere(org.apache.rocketmq.common.consumer.ConsumeFromWhere.valueOf(consumeFromWhere.name()));
log.info("litePullConsumer use " + consumeFromWhere.name());
- long consumeFromTimestamp = System.currentTimeMillis();
if (consumeFromWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
- consumeFromTimestamp = connectorConfig.getConsumeFromTimestamp();
+ long consumeFromTimestamp =
connectorConfig.getConsumeFromTimestamp();
String timestamp =
UtilAll.timeMillisToHumanString3(consumeFromTimestamp);
pullConsumer.setConsumeTimestamp(timestamp);
log.info("litePullConsumer consume start at " + timestamp);