This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch 49x/develop in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit c2c81fa9f1ccec4eb359ed78dbce06e5ba33e166 Author: 维章 <[email protected]> AuthorDate: Sat Jan 14 22:10:54 2023 +0800 change rocketmq version to 4.9.4 --- core/pom.xml | 2 +- .../streams/core/running/WorkerThread.java | 2 +- .../rocketmq/streams/core/state/RocketMQStore.java | 3 +- .../rocketmq/streams/core/util/RocketMQUtil.java | 32 +++++++++++++++++++++- example/pom.xml | 2 +- pom.xml | 4 +-- 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 798b75e6..1efbb6c7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -3,7 +3,7 @@ <parent> <artifactId>rocketmq-streams-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>1.1.1-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java index 32a2b050..19a30b65 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java @@ -232,7 +232,7 @@ public class WorkerThread extends Thread { } for (String topicName : shuffleTopic) { - RocketMQUtil.createStaticTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM); + RocketMQUtil.createNormalTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM); } } diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java index 44e04c52..531321eb 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java @@ -388,6 +388,7 @@ public class RocketMQStore extends AbstractStore implements StateStore { return target; } + //static topic queue num changes with source topic. private void createStateTopic(String stateTopic) throws Exception { if (RocketMQUtil.checkWhetherExist(stateTopic)) { return; @@ -396,7 +397,7 @@ public class RocketMQStore extends AbstractStore implements StateStore { String sourceTopic = stateTopic2SourceTopic(stateTopic); Pair<Integer, Set<String>> clustersPair = getTotalQueueNumAndClusters(sourceTopic); - RocketMQUtil.createStaticCompactTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue()); + RocketMQUtil.createNormalTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue()); } private Pair<Integer, Set<String>> getTotalQueueNumAndClusters(String sourceTopic) throws Exception { diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java index 34b787d5..2251c8e1 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java @@ -21,11 +21,13 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.Logger; @@ -41,6 +43,34 @@ public class RocketMQUtil { private static final List<String> existTopic = new ArrayList<>(); + //neither static topic nor compact topic. + public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception { + if (check(mqAdmin, topicName)) { + logger.info("topic[{}] already exist.", topicName); + return; + } + + if (clusters == null || clusters.size() == 0) { + clusters = getCluster(mqAdmin); + } + + TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum); + + for (String cluster : clusters) { + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster); + + for (String addr : masterSet) { + mqAdmin.createAndUpdateTopicConfig(addr, topicConfig); + logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster); + } + } + } + + public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum) throws Exception { + Set<String> clusters = getCluster(mqAdmin); + createNormalTopic(mqAdmin, topicName, queueNum, clusters); + } + public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception { if (check(mqAdmin, topicName)) { logger.info("topic[{}] already exist.", topicName); @@ -129,7 +159,7 @@ public class RocketMQUtil { } - public static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws Exception { + private static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws Exception { ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo(); return clusterInfo.getClusterAddrTable().keySet(); } diff --git a/example/pom.xml b/example/pom.xml index c3731b1b..68da17b4 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -3,7 +3,7 @@ <parent> <artifactId>rocketmq-streams-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>1.1.1-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/pom.xml b/pom.xml index 265402b9..16c2e5f3 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-all</artifactId> - <version>1.1.1-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <name>Apache RocketMQ Streams ${project.version}</name> <packaging>pom</packaging> <url>https://rocketmq.apache.org/</url> @@ -86,7 +86,7 @@ </modules> <properties> - <rocketmq.version>5.0.0</rocketmq.version> + <rocketmq.version>4.9.4</rocketmq.version> <rocksdbjni.version>7.6.0</rocksdbjni.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target>
