This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch 49x/develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit c2c81fa9f1ccec4eb359ed78dbce06e5ba33e166
Author: 维章 <[email protected]>
AuthorDate: Sat Jan 14 22:10:54 2023 +0800

    change rocketmq version to 4.9.4
---
 core/pom.xml                                       |  2 +-
 .../streams/core/running/WorkerThread.java         |  2 +-
 .../rocketmq/streams/core/state/RocketMQStore.java |  3 +-
 .../rocketmq/streams/core/util/RocketMQUtil.java   | 32 +++++++++++++++++++++-
 example/pom.xml                                    |  2 +-
 pom.xml                                            |  4 +--
 6 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 798b75e6..1efbb6c7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
     <parent>
         <artifactId>rocketmq-streams-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>1.1.1-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
index 32a2b050..19a30b65 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
@@ -232,7 +232,7 @@ public class WorkerThread extends Thread {
             }
 
             for (String topicName : shuffleTopic) {
-                RocketMQUtil.createStaticTopic(mqAdmin, topicName, 
StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
+                RocketMQUtil.createNormalTopic(mqAdmin, topicName, 
StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
             }
         }
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 44e04c52..531321eb 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -388,6 +388,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
         return target;
     }
 
+    //static topic queue num changes with source topic.
     private void createStateTopic(String stateTopic) throws Exception {
         if (RocketMQUtil.checkWhetherExist(stateTopic)) {
             return;
@@ -396,7 +397,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
         String sourceTopic = stateTopic2SourceTopic(stateTopic);
         Pair<Integer, Set<String>> clustersPair = 
getTotalQueueNumAndClusters(sourceTopic);
 
-        RocketMQUtil.createStaticCompactTopic(mqAdmin, stateTopic, 
clustersPair.getKey(), clustersPair.getValue());
+        RocketMQUtil.createNormalTopic(mqAdmin, stateTopic, 
clustersPair.getKey(), clustersPair.getValue());
     }
 
     private Pair<Integer, Set<String>> getTotalQueueNumAndClusters(String 
sourceTopic) throws Exception {
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
index 34b787d5..2251c8e1 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
@@ -21,11 +21,13 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
 import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 import org.slf4j.Logger;
@@ -41,6 +43,34 @@ public class RocketMQUtil {
 
     private static final List<String> existTopic = new ArrayList<>();
 
+    //neither static topic nor compact topic.
+    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String 
topicName, int queueNum, Set<String> clusters) throws Exception {
+        if (check(mqAdmin, topicName)) {
+            logger.info("topic[{}] already exist.", topicName);
+            return;
+        }
+
+        if (clusters == null || clusters.size() == 0) {
+            clusters = getCluster(mqAdmin);
+        }
+
+        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, 
queueNum);
+
+        for (String cluster : clusters) {
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
+
+            for (String addr : masterSet) {
+                mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
+                logger.info("create topic to broker:{} cluster:{}, success.", 
addr, cluster);
+            }
+        }
+    }
+
+    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String 
topicName, int queueNum) throws Exception {
+        Set<String> clusters = getCluster(mqAdmin);
+        createNormalTopic(mqAdmin, topicName, queueNum, clusters);
+    }
+
     public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, 
String topicName, int queueNum, Set<String> clusters) throws Exception {
         if (check(mqAdmin, topicName)) {
             logger.info("topic[{}] already exist.", topicName);
@@ -129,7 +159,7 @@ public class RocketMQUtil {
     }
 
 
-    public static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws 
Exception {
+    private static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws 
Exception {
         ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
         return clusterInfo.getClusterAddrTable().keySet();
     }
diff --git a/example/pom.xml b/example/pom.xml
index c3731b1b..68da17b4 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -3,7 +3,7 @@
     <parent>
         <artifactId>rocketmq-streams-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>1.1.1-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/pom.xml b/pom.xml
index 265402b9..16c2e5f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
 
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-streams-all</artifactId>
-    <version>1.1.1-SNAPSHOT</version>
+    <version>1.1.0-SNAPSHOT</version>
     <name>Apache RocketMQ Streams ${project.version}</name>
     <packaging>pom</packaging>
     <url>https://rocketmq.apache.org/</url>
@@ -86,7 +86,7 @@
     </modules>
 
     <properties>
-        <rocketmq.version>5.0.0</rocketmq.version>
+        <rocketmq.version>4.9.4</rocketmq.version>
         <rocksdbjni.version>7.6.0</rocksdbjni.version>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>

Reply via email to