This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new b30952eb [ISSUE #456] replicator support checkpoint
b30952eb is described below
commit b30952eba1aa3e4516f26f34ccab46ed150ed64b
Author: zhoubo <[email protected]>
AuthorDate: Wed Apr 5 10:38:19 2023 +0800
[ISSUE #456] replicator support checkpoint
---
.../replicator/ReplicatorCheckpointConnector.java | 179 +++++++++++++
.../replicator/ReplicatorCheckpointTask.java | 297 +++++++++++++++++++++
.../config/ReplicatorConnectorConfig.java | 226 +++++++++++++++-
.../rocketmq/replicator/utils/ReplicatorUtils.java | 81 +++++-
4 files changed, 764 insertions(+), 19 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
new file mode 100644
index 00000000..c1dbdc65
--- /dev/null
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.InitMQClientException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class ReplicatorCheckpointConnector extends SourceConnector {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+ private KeyValue config;
+
+ private DefaultMQAdminExt targetMqAdminExt;
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> configs = new ArrayList<>();
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD,
config.getString(ReplicatorConnectorConfig.SRC_CLOUD));
+ keyValue.put(ReplicatorConnectorConfig.SRC_REGION,
config.getString(ReplicatorConnectorConfig.SRC_REGION));
+ keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER,
config.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
+ if (null !=
config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
+ keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID,
config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+ }
+ keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT,
config.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
+ keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
+ keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY,
config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
+ keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY,
config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
+ keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS,
config.getString(ReplicatorConnectorConfig.SRC_TOPICS));
+ keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD,
config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
+ keyValue.put(ReplicatorConnectorConfig.DEST_REGION,
config.getString(ReplicatorConnectorConfig.DEST_REGION));
+ keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER,
config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
+ if (null !=
config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
+ keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID,
config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+ }
+ keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT,
config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
+ if (null != config.getString(ReplicatorConnectorConfig.DEST_TOPIC)) {
+ keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC,
config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
+ }
+ keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
+ keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY,
config.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
+ keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY,
config.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
+ keyValue.put(ReplicatorConnectorConfig.SYNC_GIDS,
config.getString(ReplicatorConnectorConfig.SYNC_GIDS));
+ configs.add(keyValue);
+ return configs;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return ReplicatorCheckpointTask.class;
+ }
+
+
+ private Set<String> neededParamKeys = new HashSet<String>() {
+ {
+ add(ReplicatorConnectorConfig.SRC_CLOUD);
+ add(ReplicatorConnectorConfig.SRC_REGION);
+ add(ReplicatorConnectorConfig.SRC_CLUSTER);
+ add(ReplicatorConnectorConfig.SRC_ENDPOINT);
+ add(ReplicatorConnectorConfig.SRC_TOPICS);
+ add(ReplicatorConnectorConfig.DEST_CLOUD);
+ add(ReplicatorConnectorConfig.DEST_REGION);
+ add(ReplicatorConnectorConfig.DEST_CLUSTER);
+ add(ReplicatorConnectorConfig.DEST_ENDPOINT);
+ add(ReplicatorConnectorConfig.SRC_CLOUD);
+ add(ReplicatorConnectorConfig.SYNC_GIDS);
+ add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
+ add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
+ }
+ };
+
+
+ @Override
+ public void validate(KeyValue config) {
+ if (config.getInt(ConnectorConfig.MAX_TASK, 1) > 1) {
+ log.warn("ReplicatorCheckpointConnector no need to set max-task,
only used 1.");
+ }
+ // checkpoint just need only one task.
+ config.put(ConnectorConfig.MAX_TASK, 1);
+
ReplicatorUtils.checkNeedParams(ReplicatorCheckpointConnector.class.getName(),
config, neededParamKeys);
+
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.config = keyValue;
+ try {
+ buildAndStartTargetMQAdmin();
+ createCheckpointTopic();
+ } catch (MQClientException e) {
+ throw new InitMQClientException("Replicator checkpoint connector
init mqAdminClient error.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ closeAdmin();
+ }
+
+ private void buildAndStartTargetMQAdmin() throws MQClientException {
+ // use /home/admin/onskey white ak as default
+ RPCHook rpcHook = null;
+ String destAclEnable =
config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false");
+ if (destAclEnable.equalsIgnoreCase("true")) {
+ String destAccessKey =
config.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY);
+ String destSecretKey =
config.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY);
+ if (StringUtils.isNotEmpty(destAccessKey) &&
StringUtils.isNotEmpty(destSecretKey)) {
+ rpcHook = new AclClientRPCHook(new
SessionCredentials(destAccessKey, destSecretKey));
+ } else {
+ rpcHook = new AclClientRPCHook(new SessionCredentials());
+ }
+ }
+ if (null == targetMqAdminExt) {
+ targetMqAdminExt = new DefaultMQAdminExt(rpcHook);
+
targetMqAdminExt.setNamesrvAddr(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
+
targetMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + "-" +
UUID.randomUUID().toString());
+
targetMqAdminExt.setInstanceName("ReplicatorCheckpointConnector_InstanceName_"
+ UUID.randomUUID().toString());
+ targetMqAdminExt.start();
+ }
+ }
+ private synchronized void closeAdmin() {
+ if (targetMqAdminExt != null) {
+ try {
+ targetMqAdminExt.shutdown();
+ } catch (Throwable e) {
+ log.error("close Admin error,", e);
+ }
+ }
+ }
+
+ private void createCheckpointTopic() {
+ //create target checkpoint topic, todo compact topic
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(8);
+ int perm = PermName.PERM_INHERIT | PermName.PERM_READ |
PermName.PERM_WRITE;
+ topicConfig.setPerm(perm);
+
topicConfig.setTopicName(config.getString(ReplicatorConnectorConfig.CHECKPOINT_TOPIC,
ReplicatorConnectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+ ReplicatorUtils.createTopic(targetMqAdminExt, topicConfig);
+ }
+}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
new file mode 100644
index 00000000..945018cd
--- /dev/null
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
+import org.apache.rocketmq.replicator.exception.InitMQClientException;
+import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.CONSUMER_GROUP_KEY;
+import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.DOWNSTREAM_LASTTIMESTAMP_KEY;
+import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.METADATA_KEY;
+import static org.apache.rocketmq.replicator.utils.ReplicatorUtils.TOPIC_KEY;
+import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.UPSTREAM_LASTTIMESTAMP_KEY;
+
+
+public class ReplicatorCheckpointTask extends SourceTask {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+ private ReplicatorConnectorConfig connectorConfig = new
ReplicatorConnectorConfig();
+ private long lastCheckPointTimestamp = System.currentTimeMillis();
+ private DefaultMQAdminExt srcMqAdminExt;
+ private DefaultMQAdminExt targetMqAdminExt;
+
+
+ public static final Schema VALUE_SCHEMA_V0 = SchemaBuilder.struct()
+ .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+ .field(TOPIC_KEY, SchemaBuilder.string().build())
+ .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+ .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+ .field(METADATA_KEY, SchemaBuilder.string().build())
+ .build();
+
+
+ public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
+ .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+ .field(TOPIC_KEY, SchemaBuilder.string().build())
+ .build();
+
+
+ private void buildMqAdmin() throws MQClientException {
+ buildAndStartSrcMQAdmin();
+ buildAndStartTargetMQAdmin();
+ }
+
+ private void buildAndStartSrcMQAdmin() throws MQClientException {
+ RPCHook rpcHook = null;
+ if (connectorConfig.isSrcAclEnable()) {
+ if (StringUtils.isNotEmpty(connectorConfig.getSrcAccessKey()) &&
StringUtils.isNotEmpty(connectorConfig.getSrcSecretKey())) {
+ String srcAccessKey = connectorConfig.getSrcAccessKey();
+ String srcSecretKey = connectorConfig.getSrcSecretKey();
+ rpcHook = new AclClientRPCHook(new
SessionCredentials(srcAccessKey, srcSecretKey));
+ } else {
+ rpcHook = new AclClientRPCHook(new SessionCredentials());
+ }
+ }
+ srcMqAdminExt = new DefaultMQAdminExt(rpcHook);
+ srcMqAdminExt.setNamesrvAddr(connectorConfig.getSrcEndpoint());
+ srcMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP +
"-" + UUID.randomUUID().toString());
+ srcMqAdminExt.setInstanceName(connectorConfig.generateSourceString() +
"-" + UUID.randomUUID().toString());
+ try {
+ srcMqAdminExt.start();
+ } catch (Exception e) {
+ log.error("ReplicatorCheckpoint init src mqadmin error,", e);
+ throw e;
+ }
+ }
+
+ private void buildAndStartTargetMQAdmin() throws MQClientException {
+ // use /home/admin/onskey white ak as default
+ RPCHook rpcHook = null;
+ if (connectorConfig.isDestAclEnable()) {
+ if (StringUtils.isNotEmpty(connectorConfig.getDestAccessKey()) &&
StringUtils.isNotEmpty(connectorConfig.getDestSecretKey())) {
+ String destAccessKey = connectorConfig.getDestAccessKey();
+ String destSecretKey = connectorConfig.getDestSecretKey();
+ rpcHook = new AclClientRPCHook(new
SessionCredentials(destAccessKey, destSecretKey));
+ } else {
+ rpcHook = new AclClientRPCHook(new SessionCredentials());
+ }
+ }
+ targetMqAdminExt = new DefaultMQAdminExt(rpcHook);
+ targetMqAdminExt.setNamesrvAddr(connectorConfig.getDestEndpoint());
+
targetMqAdminExt.setAdminExtGroup(ReplicatorConnectorConfig.ADMIN_GROUP + "-" +
UUID.randomUUID().toString());
+
targetMqAdminExt.setInstanceName(connectorConfig.generateDestinationString() +
"-" + UUID.randomUUID().toString());
+ try {
+ targetMqAdminExt.start();
+ } catch (Exception e) {
+ log.error("ReplicatorCheckpoint init target mqadmin error,", e);
+ throw e;
+ }
+ }
+
+
+ @Override
+ public List<ConnectRecord> poll() throws InterruptedException {
+ if (lastCheckPointTimestamp +
connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
+ log.info("sleep " + lastCheckPointTimestamp + ", " +
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
+ Thread.sleep(connectorConfig.getCheckpointIntervalMs() -
System.currentTimeMillis() + lastCheckPointTimestamp + 100);
+ return null;
+ }
+ log.info("not sleep " + lastCheckPointTimestamp + ", " +
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
+ List<ConnectRecord> connectRecords = new LinkedList<>();
+ // pull consumergroup's consumer commit offset
+ String syncGids = connectorConfig.getSyncGids();
+ if (StringUtils.isEmpty(syncGids)) {
+ lastCheckPointTimestamp = System.currentTimeMillis();
+ return null;
+ }
+ Set<String> srcTopics =
connectorConfig.getSrcTopics(connectorConfig.getSrcTopics());
+ try {
+ String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
+ for (String consumerGroup : syncGidArr) {
+ for (String srcTopic : srcTopics) {
+ String srcTopicWithInstanceId =
ReplicatorUtils.buildTopicWithNamespace(srcTopic,
connectorConfig.getSrcInstanceId());
+ String srcConsumerGroupWithInstanceId =
ReplicatorUtils.buildConsumergroupWithNamespace(consumerGroup,
connectorConfig.getSrcInstanceId());
+ try {
+ ConsumeStats srcConsumeStats =
srcMqAdminExt.examineConsumeStats(srcConsumerGroupWithInstanceId,
srcTopicWithInstanceId);
+ long minSrcLasttimestamp =
getMinSrcLasttimestamp(srcConsumeStats);
+
+ String targetTopic = connectorConfig.getDestTopic();
+ String targetTopicWithInstanceId;
+ if (StringUtils.isEmpty(targetTopic) ||
StringUtils.isBlank(targetTopic)) {
+ targetTopicWithInstanceId =
ReplicatorUtils.buildTopicWithNamespace(srcTopic,
connectorConfig.getDestInstanceId());
+ } else {
+ targetTopicWithInstanceId =
ReplicatorUtils.buildTopicWithNamespace(targetTopic,
connectorConfig.getDestInstanceId());
+ }
+ ConsumeStats targetConsumeStats =
targetMqAdminExt.examineConsumeStats(consumerGroup, targetTopicWithInstanceId);
+ long minDestLasttimestamp =
getMinSrcLasttimestamp(targetConsumeStats);
+
+ RecordPartition recordPartition =
ReplicatorUtils.convertToRecordPartition(srcTopic, consumerGroup);
+ RecordOffset recordOffset =
ReplicatorUtils.convertToRecordOffset(0L);
+ ConnectRecord connectRecord = new
ConnectRecord(recordPartition, recordOffset, System.currentTimeMillis());
+ Struct keyStruct =
buildCheckpointKey(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId);
+ Struct valueStruct =
buildCheckpointPlayload(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId,
minSrcLasttimestamp, minDestLasttimestamp);
+ connectRecord.setKeySchema(KEY_SCHEMA);
+ connectRecord.setKey(keyStruct);
+ connectRecord.setSchema(VALUE_SCHEMA_V0);
+ connectRecord.setData(valueStruct);
+ connectRecord.addExtension(TOPIC_KEY,
connectorConfig.getCheckpointTopic());
+ connectRecords.add(connectRecord);
+ } catch (Exception e) {
+ log.error("examineConsumeStats gid : " + consumerGroup
+ ", topic : " + srcTopic + " error", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("get syncGids committed offset error, syncGids : " +
syncGids, e);
+ }
+ //
+ lastCheckPointTimestamp = System.currentTimeMillis();
+ return connectRecords;
+ }
+
+ @NotNull
+ private static Struct buildCheckpointPlayload(String
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId, long
minSrcLasttimestamp, long minDestLasttimestamp) {
+ Struct struct = new Struct(VALUE_SCHEMA_V0);
+ struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
+ struct.put(TOPIC_KEY, srcTopicWithInstanceId);
+ struct.put(UPSTREAM_LASTTIMESTAMP_KEY, minSrcLasttimestamp);
+ struct.put(DOWNSTREAM_LASTTIMESTAMP_KEY, minDestLasttimestamp);
+ struct.put(METADATA_KEY, String.valueOf(System.currentTimeMillis()));
+ return struct;
+ }
+
+ private static Struct buildCheckpointKey(String srcTopicWithInstanceId,
String srcConsumerGroupWithInstanceId) {
+ Struct struct = new Struct(KEY_SCHEMA);
+ struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
+ struct.put(TOPIC_KEY, srcTopicWithInstanceId);
+ return struct;
+ }
+
+ private long getMinSrcLasttimestamp(ConsumeStats consumeStats) throws
RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ long minSrcLasttimestamp = -1;
+ if (null != consumeStats) {
+ Map<MessageQueue, OffsetWrapper> offsetTable =
consumeStats.getOffsetTable();
+ if (null != offsetTable) {
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry :
offsetTable.entrySet()) {
+ OffsetWrapper offsetWrapper = entry.getValue();
+ if (null == offsetWrapper) {
+ continue;
+ }
+ long lastTimestamp = offsetWrapper.getLastTimestamp();
+ if (minSrcLasttimestamp == -1) {
+ minSrcLasttimestamp = lastTimestamp;
+ } else if (minSrcLasttimestamp > lastTimestamp) {
+ minSrcLasttimestamp = lastTimestamp;
+ }
+ }
+ }
+ }
+ return minSrcLasttimestamp;
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ log.info("ReplicatorCheckpointTask init " + config);
+ log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
+ fillConnectorConfig(config);
+ // init mqadmin client
+ try {
+ buildMqAdmin();
+ } catch (Exception e) {
+ cleanResource();
+ log.error("buildMqAdmin error,", e);
+ throw new InitMQClientException("Replicator checkpoint task init
mqAdminClient error.", e);
+ }
+ }
+
+ private void fillConnectorConfig(KeyValue config) {
+ // build connectConfig
+
connectorConfig.setSrcCloud(config.getString(connectorConfig.SRC_CLOUD));
+
connectorConfig.setSrcRegion(config.getString(connectorConfig.SRC_REGION));
+
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
+
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
+
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
+
connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS));
+
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
+
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
+
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
+
connectorConfig.setDestInstanceId(config.getString(connectorConfig.DEST_INSTANCEID));
+
connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT));
+
connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC));
+
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
"true")));
+
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
"true")));
+
connectorConfig.setCheckpointIntervalMs(config.getInt(connectorConfig.CHECKPOINT_INTERVAL_MS,
connectorConfig.getCheckpointIntervalMs()));
+
connectorConfig.setSyncGids(config.getString(connectorConfig.SYNC_GIDS));
+
connectorConfig.setCheckpointTopic(config.getString(connectorConfig.CHECKPOINT_TOPIC,
connectorConfig.DEFAULT_CHECKPOINT_TOPIC));
+ log.info("ReplicatorCheckpointTask connectorConfig : " +
connectorConfig);
+ }
+
+ private void cleanResource() {
+ try {
+ if (srcMqAdminExt != null) {
+ srcMqAdminExt.shutdown();
+ }
+ if (targetMqAdminExt != null) {
+ targetMqAdminExt.shutdown();
+ }
+ } catch (Exception e) {
+ log.error("clean resource error,", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ cleanResource();
+ }
+}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 2a926bc2..ab789c69 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -13,7 +13,9 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
+
package org.apache.rocketmq.replicator.config;
import com.google.common.base.Splitter;
@@ -22,26 +24,26 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
-
-/**
- * @author osgoo
- * @date 2022/6/16
- */
public class ReplicatorConnectorConfig {
private Log log = LogFactory.getLog(ReplicatorConnectorConfig.class);
// replicator task id
private String taskId;
// connector id
private String connectorId;
+ // replicator task id with index for max-task, format replicatorTaskId-i
+// private String replicatorTaskIdWithIndex;
// src & dest
private String srcCloud;
private String srcRegion;
private String srcCluster;
private String srcInstanceId;
private String srcTopicTags; // format
topic-1,tag-a;topic-2,tag-b;topic-3,tag-c
+ private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c
private String srcEndpoint;
private boolean srcAclEnable;
private boolean autoCreateInnerConsumergroup;
@@ -56,6 +58,10 @@ public class ReplicatorConnectorConfig {
private boolean destAclEnable;
private String destAccessKey;
private String destSecretKey;
+ // filter properties
+ private String filterProperties;
+ private Map<String, String> filterPropertiesMap = new HashMap<>();
+ private String filterPropertiesOperator = "and";
// consume from where
private ConsumeFromWhere consumeFromWhere =
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// consume from timestamp
@@ -63,14 +69,33 @@ public class ReplicatorConnectorConfig {
// sourcetask replicate to mq failover strategy
private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS;
private boolean enableHeartbeat = true;
-
+ private boolean enableCheckpoint = true;
+ private boolean enableRetrySync = true;
+ // srcRetryGids used for retry sync,
+ private String srcRetryGids;
+ // destRetryTopic used for store messages from %RETRY%srcRetryGids, can
set normal topic or retry topic
+ private String destRetryTopic;
+ private boolean enableDlqSync = true;
+ // srcDlqGids used for dlq sync
+ private String srcDlqGids;
+ // destDlqTopic used for store messages from DLQ, can set normal topic
+ private String destDlqTopic;
+ // specify needed to sync %RETRY%GID, GID format is instanceId%Gid
+ // if not set, sync all online gids
+ private String syncGids;
private String dividedNormalQueues;
+ private String dividedRetryQueues;
+ private String dividedDlqQueues;
// tps limit
private int syncTps = 1000;
+ private int maxTask = 2;
//
private int heartbeatIntervalMs = 1 * 1000;
+ private int checkpointIntervalMs = 10 * 1000;
private String heartbeatTopic;
+ private String checkpointTopic;
public final static String DEFAULT_HEARTBEAT_TOPIC =
"replicator_heartbeat";
+ public final static String DEFAULT_CHECKPOINT_TOPIC =
"replicator_checkpoint";
public final static String TOPIC_TAG_SPLITTER = ",";
public final static String TOPIC_SPLITTER = ";";
public final static String TASK_NAME_SPLITTER = "_";
@@ -79,6 +104,7 @@ public class ReplicatorConnectorConfig {
public final static String GID_SPLITTER = ",";
public final static String ADMIN_GROUP = "replicator_admin_group";
public final static int DEFAULT_SYNC_TPS = 1000;
+ public final static int DEFAULT_MAX_TASK = 2;
// poll config
private int eachQueueBufferSize = 1000;
@@ -87,11 +113,14 @@ public class ReplicatorConnectorConfig {
// converter
private String sourceConverter;
+// public final static String REPLICATOR_TASK_ID_WITH_INDEX =
"replicator-subtask-id";
public final static String SRC_CLOUD = "src.cloud";
public final static String SRC_REGION = "src.region";
public final static String SRC_CLUSTER = "src.cluster";
public final static String SRC_INSTANCEID = "src.instanceid";
public final static String SRC_TOPICTAGS = "src.topictags";
+
+ public final static String SRC_TOPICS = "src.topics";
public final static String SRC_ENDPOINT = "src.endpoint";
public final static String SRC_ACL_ENABLE = "src.acl.enable";
public final static String SRC_ACCESS_KEY = "src.access.key";
@@ -106,16 +135,31 @@ public class ReplicatorConnectorConfig {
public final static String AUTO_CREATE_INNER_CONSUMERGROUP =
"auto.create.inner.consumergroup";
public final static String DEST_ACCESS_KEY = "dest.access.key";
public final static String DEST_SECRET_KEY = "dest.secret.key";
+ public final static String FILTER_PROPERTIES = "filter.properties";//
pro1,pro2 or pro1=val1,prop2=val2
+ public final static String FILTER_PROPERTIES_OPERATOR =
"filter.properties.operator";// and or
public final static String FAILOVER_STRATEGY = "failover.strategy";
public final static String ENABLE_HEARTBEAT = "enable.heartbeat";
public final static String ENABLE_CHECKPOINT = "enable.checkpoint";
+ public final static String ENABLE_RETRY_SYNC = "enable.retrysync";
+ public final static String SRC_RETRY_GIDS = "src.retrygids";
+ public final static String DEST_RETRY_TOPIC = "dest.retrytopic";
+ public final static String ENABLE_DLQ_SYNC = "enable.dlqsync";
+ public final static String SRC_DLQ_GIDS = "dest.dlqgids";
+ public final static String DEST_DLQ_TOPIC = "dest.dqltopic";
+ public final static String SYNC_GIDS = "sync.gids";
public final static String HEARTBEAT_INTERVALS_MS =
"heartbeat.interval.ms";
+ public final static String CHECKPOINT_INTERVAL_MS =
"checkpoint.interval.ms";
+ public final static String CHECKPOINT_TOPIC = "checkpoint.topic";
public final static String HEARTBEAT_TOPIC = "heartbeat.topic";
+ public final static String EACH_QUEUE_BUFFER_SIZE =
"each.queue.buffer.size";
public final static String CONSUME_FROM_WHERE = "consumefromwhere";
public final static String CONSUME_FROM_TIMESTAMP = "consumefromtimestamp";
public final static String DIVIDE_STRATEGY = "divide.strategy";
public final static String DIVIDED_NORMAL_QUEUES = "divided.normalqueues";
+ public final static String DIVIDED_RETRY_QUEUES = "divided.retryqueues";
+ public final static String DIVIDED_DLQ_QUEUES = "divided.dlqqueues";
public final static String SYNC_TPS = "sync.tps";
+ public final static String MAX_TASK = "max.task";
public String getTaskId() {
return taskId;
@@ -180,6 +224,8 @@ public class ReplicatorConnectorConfig {
if (topicAndTag.size() == 1) {
if (StringUtils.isBlank(srcInstanceId)) {
topicTagMap.put(topicAndTag.get(0), "*");
+ } else if (topicTagPair.startsWith("%RETRY%") ||
topicTagPair.startsWith("%DLQ%")) {
+ topicTagMap.put(topicAndTag.get(0), "*");
} else {
topicTagMap.put(srcInstanceId + "%" + topicAndTag.get(0),
"*");
}
@@ -194,6 +240,15 @@ public class ReplicatorConnectorConfig {
return topicTagMap;
}
+
+ public static Set<String> getSrcTopics(String srcTopics) {
+ if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) {
+ return null;
+ }
+ List<String> topicList =
Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics);
+ return new HashSet(topicList);
+ }
+
public String getSrcTopicTags() {
return srcTopicTags;
}
@@ -202,6 +257,14 @@ public class ReplicatorConnectorConfig {
this.srcTopicTags = srcTopicTags;
}
+ public String getSrcTopics() {
+ return srcTopics;
+ }
+
+ public void setSrcTopics(String srcTopics) {
+ this.srcTopics = srcTopics;
+ }
+
public String getSrcEndpoint() {
return srcEndpoint;
}
@@ -258,6 +321,39 @@ public class ReplicatorConnectorConfig {
this.destEndpoint = destEndpoint;
}
+ public String getFilterProperties() {
+ return filterProperties;
+ }
+
+ public Map<String, String> getFilterPropertiesMap() {
+ return this.filterPropertiesMap;
+ }
+
+ public void setFilterProperties(String filterProperties) {
+ this.filterProperties = filterProperties;
+ if (StringUtils.isNotBlank(filterProperties)) {
+ String[] kvs = filterProperties.split(",");
+ for (String kv : kvs) {
+ String[] items = kv.split("=");
+ if (items.length == 1) {
+ filterPropertiesMap.put(items[0], null);
+ } else if (items.length == 2) {
+ filterPropertiesMap.put(items[0], items[1]);
+ } else {
+ log.error("parse filterProperties error");
+ }
+ }
+ }
+ }
+
+ public String getFilterPropertiesOperator() {
+ return filterPropertiesOperator;
+ }
+
+ public void setFilterPropertiesOperator(String filterPropertiesOperator) {
+ this.filterPropertiesOperator = filterPropertiesOperator;
+ }
+
public FailoverStrategy getFailoverStrategy() {
return failoverStrategy;
}
@@ -274,6 +370,69 @@ public class ReplicatorConnectorConfig {
this.enableHeartbeat = enableHeartbeat;
}
+ public boolean isEnableCheckpoint() {
+ return enableCheckpoint;
+ }
+
+ public void setEnableCheckpoint(boolean enableCheckpoint) {
+ this.enableCheckpoint = enableCheckpoint;
+ }
+
+ public boolean isEnableRetrySync() {
+ return enableRetrySync;
+ }
+
+ public void setEnableRetrySync(boolean enableRetrySync) {
+ this.enableRetrySync = enableRetrySync;
+ }
+
+ public String getSrcRetryGids() {
+ return srcRetryGids;
+ }
+
+ public void setSrcRetryGids(String srcRetryGids) {
+ this.srcRetryGids = srcRetryGids;
+ }
+
+ public String getDestRetryTopic() {
+ return destRetryTopic;
+ }
+
+ public void setDestRetryTopic(String destRetryTopic) {
+ this.destRetryTopic = destRetryTopic;
+ }
+
+ public boolean isEnableDlqSync() {
+ return enableDlqSync;
+ }
+
+ public void setEnableDlqSync(boolean enableDlqSync) {
+ this.enableDlqSync = enableDlqSync;
+ }
+
+ public String getSrcDlqGids() {
+ return srcDlqGids;
+ }
+
+ public void setSrcDlqGids(String srcDlqGids) {
+ this.srcDlqGids = srcDlqGids;
+ }
+
+ public String getDestDlqTopic() {
+ return destDlqTopic;
+ }
+
+ public void setDestDlqTopic(String destDlqTopic) {
+ this.destDlqTopic = destDlqTopic;
+ }
+
+ public String getSyncGids() {
+ return syncGids;
+ }
+
+ public void setSyncGids(String syncGids) {
+ this.syncGids = syncGids;
+ }
public String getDividedNormalQueues() {
return dividedNormalQueues;
@@ -283,6 +442,22 @@ public class ReplicatorConnectorConfig {
this.dividedNormalQueues = dividedNormalQueues;
}
+ public String getDividedRetryQueues() {
+ return dividedRetryQueues;
+ }
+
+ public void setDividedRetryQueues(String dividedRetryQueues) {
+ this.dividedRetryQueues = dividedRetryQueues;
+ }
+
+ public String getDividedDlqQueues() {
+ return dividedDlqQueues;
+ }
+
+ public void setDividedDlqQueues(String dividedDlqQueues) {
+ this.dividedDlqQueues = dividedDlqQueues;
+ }
+
public int getHeartbeatIntervalMs() {
return heartbeatIntervalMs;
}
@@ -291,6 +466,14 @@ public class ReplicatorConnectorConfig {
this.heartbeatIntervalMs = heartbeatIntervalMs;
}
+ public int getCheckpointIntervalMs() {
+ return checkpointIntervalMs;
+ }
+
+ public void setCheckpointIntervalMs(int checkpointIntervalMs) {
+ this.checkpointIntervalMs = checkpointIntervalMs;
+ }
+
public String getHeartbeatTopic() {
return heartbeatTopic;
}
@@ -299,6 +482,14 @@ public class ReplicatorConnectorConfig {
this.heartbeatTopic = heartbeatTopic;
}
+ public String getCheckpointTopic() {
+ return checkpointTopic;
+ }
+
+ public void setCheckpointTopic(String checkpointTopic) {
+ this.checkpointTopic = checkpointTopic;
+ }
+
public int getEachQueueBufferSize() {
return eachQueueBufferSize;
}
@@ -337,6 +528,14 @@ public class ReplicatorConnectorConfig {
this.syncTps = syncTps;
}
+ public int getMaxTask() {
+ return maxTask;
+ }
+
+ public void setMaxTask(int maxTask) {
+ this.maxTask = maxTask;
+ }
+
public int getPullMaxNum() {
return pullMaxNum;
}
@@ -345,9 +544,10 @@ public class ReplicatorConnectorConfig {
this.pullMaxNum = pullMaxNum;
}
+ public static final String SYSTEM_CONSUMER_PREFIX =
"CID_RMQ_SYS_REPLICATOR_";
public String generateTaskIdWithIndexAsConsumerGroup() {
// todo need use replicatorTaskIdWithIndex for consumerGroup ???
- return "SYS_ROCKETMQ_REPLICATOR_" + connectorId;
+ return SYSTEM_CONSUMER_PREFIX + connectorId;
// return "SYS_ROCKETMQ_REPLICATOR_" + connectorId + "_" + taskId;
}
@@ -454,10 +654,22 @@ public class ReplicatorConnectorConfig {
", consumeFromTimestamp=" + consumeFromTimestamp +
", failoverStrategy=" + failoverStrategy +
", enableHeartbeat=" + enableHeartbeat +
+ ", enableCheckpoint=" + enableCheckpoint +
+ ", enableRetrySync=" + enableRetrySync +
+ ", srcRetryGids='" + srcRetryGids + '\'' +
+ ", destRetryTopic='" + destRetryTopic + '\'' +
+ ", enableDlqSync=" + enableDlqSync +
+ ", srcDlqGids='" + srcDlqGids + '\'' +
+ ", destDlqTopic='" + destDlqTopic + '\'' +
+ ", syncGids='" + syncGids + '\'' +
", dividedNormalQueues='" + dividedNormalQueues + '\'' +
+ ", dividedRetryQueues='" + dividedRetryQueues + '\'' +
+ ", dividedDlqQueues='" + dividedDlqQueues + '\'' +
", syncTps=" + syncTps +
", heartbeatIntervalMs=" + heartbeatIntervalMs +
+ ", checkpointIntervalMs=" + checkpointIntervalMs +
", heartbeatTopic='" + heartbeatTopic + '\'' +
+ ", checkpointTopic='" + checkpointTopic + '\'' +
", eachQueueBufferSize=" + eachQueueBufferSize +
", pullMaxNum=" + pullMaxNum +
", sourceConverter='" + sourceConverter + '\'' +
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
index 1c7e516c..fd28a9b7 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
@@ -13,29 +13,54 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
+
package org.apache.rocketmq.replicator.utils;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.replicator.exception.ParamInvalidException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import static
org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
-/**
- * @author osgoo
- * @date 2022/7/21
- */
public class ReplicatorUtils {
- private static Log log = LogFactory.getLog(ReplicatorUtils.class);
- public static String buildTopicWithNamespace(String rawTopic, String
instanceId) {
- if (StringUtils.isBlank(instanceId) ||
StringUtils.isEmpty(instanceId)) {
- return rawTopic;
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
+
+ public static final String TOPIC_KEY = "topic";
+ public static final String CONSUMER_GROUP_KEY = "consumerGroup";
+ public static final String UPSTREAM_LASTTIMESTAMP_KEY =
"upstreamLastTimestamp";
+ public static final String DOWNSTREAM_LASTTIMESTAMP_KEY =
"downstreamLastTimestamp";
+ public static final String METADATA_KEY = "metadata";
+
+ public static String buildTopicWithNamespace(String topic, String
instanceId) {
+ if (StringUtils.isBlank(instanceId)) {
+ return topic;
+ }
+ return instanceId + "%" + topic;
+ }
+
+ public static String buildConsumergroupWithNamespace(String consumerGroup,
String instanceId) {
+ if (StringUtils.isBlank(instanceId)) {
+ return consumerGroup;
}
- return instanceId + "%" + rawTopic;
+ return instanceId + "%" + consumerGroup;
}
public static void checkNeedParams(String connectorName, KeyValue config,
Set<String> neededParamKeys) {
@@ -57,4 +82,36 @@ public class ReplicatorUtils {
configs = Arrays.asList(sortedKv);
return configs;
}
+
+ public static RecordPartition convertToRecordPartition(String topic,
String consumerGroup) {
+ Map<String, String> map = new HashMap<>();
+ map.put(CONSUMER_GROUP_KEY, consumerGroup);
+ map.put(TOPIC_KEY, topic);
+ RecordPartition recordPartition = new RecordPartition(map);
+ return recordPartition;
+ }
+
+ public static RecordOffset convertToRecordOffset(Long offset) {
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(QUEUE_OFFSET, offset + "");
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt,
TopicConfig topicConfig) {
+ try {
+ ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
+ Map<String, Set<String>> clusterAddrTable =
clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr,
topicConfig);
+ log.info("CreateTopic cluster {}, addr {}, masterSet {},
create topic {} success. ", clusterName, addr, masterSet, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create topic [" +
topicConfig.getTopicName() + "] failed", e);
+ }
+ }
}