This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit a0d18f7499d32686d580731ef369a3f47ff02b1e Author: huitong <[email protected]> AuthorDate: Sun Jul 24 14:49:52 2022 +0800 init compact topic --- schema-storage-rocketmq/pom.xml | 4 ++-- .../registry/storage/rocketmq/RocketmqClient.java | 22 +++++++++++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml index a3f4b17..0b23abe 100644 --- a/schema-storage-rocketmq/pom.xml +++ b/schema-storage-rocketmq/pom.xml @@ -38,13 +38,13 @@ <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> - <version>4.9.3</version> + <version>5.0.0-ALPHA</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> - <version>4.9.3</version> + <version>5.0.0-ALPHA</version> </dependency> </dependencies> diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 87cde86..9c0bc7a 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.exception.SchemaException; import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException; @@ -111,6 +112,22 @@ public class RocketmqClient { try { mqAdminExt.start(); + // check if the topic exists + TopicRouteData topicRouteData = null; + try { + topicRouteData = mqAdminExt.examineTopicRouteInfo(storageTopic); + } catch (MQClientException e) { + log.warn("maybe the storage topic not found, need to create"); + } catch (Exception e) { + throw new SchemaException("Failed to create storage rocketmq topic", e); + } + + if (topicRouteData != null && CollectionUtils.isNotEmpty(topicRouteData.getBrokerDatas()) + && CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas())) { + log.info("the storage topic already exist, no need to create"); + return; + } + try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable(); @@ -119,7 +136,10 @@ public class RocketmqClient { topicConfig.setTopicName(storageTopic); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8); - // TODO compact topic (TopicAttributes) + // create compact topic + Map<String, String> attributes = new HashMap<>(1); + attributes.put("+delete.policy", "COMPACTION"); + topicConfig.setAttributes(attributes); String brokerAddr = brokerData.selectBrokerAddr(); mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig); }
