This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch 49x/develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit a809f17a7bdd67324977d8321414ca7da2ebd925
Author: 维章 <[email protected]>
AuthorDate: Mon Jan 16 10:29:24 2023 +0800

    create normal topic for state storage.
---
 .../core/function/supplier/SinkSupplier.java       | 10 ++-
 .../rocketmq/streams/core/state/RocketMQStore.java | 28 ++++++++-
 .../rocketmq/streams/core/state/StateStore.java    |  1 +
 .../rocketmq/streams/core/util/RocketMQUtil.java   | 71 +---------------------
 4 files changed, 35 insertions(+), 75 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index 393e8956..09d56467 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -61,7 +61,7 @@ public class SinkSupplier<K, T> implements 
Supplier<Processor<T>> {
             this.key = context.getKey();
         }
 
-        //sink into shuffle topic/state topic/user topic
+        //sink into shuffle/user topic
         @Override
         public void process(T data) throws Throwable {
             if (data != null) {
@@ -83,8 +83,9 @@ public class SinkSupplier<K, T> implements 
Supplier<Processor<T>> {
                     producer.send(message);
                 } else {
                     message = new Message(this.topicName, value);
+                    String hexKey = Utils.toHexString(this.key);
                     //the real key is in the body, this key is used to route 
the same key into the same queue.
-                    message.setKeys(Utils.toHexString(this.key));
+                    message.setKeys(hexKey);
 
 
                     message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, 
this.key.getClass().getName());
@@ -94,7 +95,10 @@ public class SinkSupplier<K, T> implements 
Supplier<Processor<T>> {
                         message.putUserProperty(Constant.SOURCE_TIMESTAMP, 
String.valueOf(this.context.getDataTime()));
                     }
 
-                    producer.send(message, new SelectMessageQueueByHash(), 
this.key);
+                    //For data write back, Write-prohibited is forbidden, 
because it will make send message failed.
+                    //And if the MessageQueue num changed(like expansion), the 
data with same key will be sent into different MessageQueue.
+                    //shuffle topic must be Static topic, to solve the problem 
in expansion.
+                    producer.send(message, new SelectMessageQueueByHash(), 
hexKey);
                 }
             }
         }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 531321eb..68e39dbe 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.streams.core.state;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.MixAll;
@@ -41,6 +43,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
@@ -176,6 +179,8 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
 
         Set<MessageQueue> stateTopicQueues = 
convertSourceTopicQueue2StateTopicQueue(messageQueues);
         for (MessageQueue stateTopicQueue : stateTopicQueues) {
+            //if the source queue is removed, skip it.
+
             String stateTopicQueueKey = buildKey(stateTopicQueue);
             Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey);
 
@@ -264,6 +269,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
                 for (String stateUniqueQueue : groupByUniqueQueue.keySet()) {
                     Set<byte[]> stateTopicQueueKey = 
super.getAll(stateUniqueQueue);
                     for (byte[] key : stateTopicQueueKey) {
+                        logger.info("remove state queue:{}, delete 
corresponding state from rocksdb", stateUniqueQueue);
                         this.rocksDBStore.deleteByKey(key);
                     }
                     super.removeAll(stateUniqueQueue);
@@ -271,6 +277,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
 
 
                 for (MessageQueue stateMessageQueue : stateTopicQueue) {
+                    logger.info("remove state queue:{}, remove corresponding 
recover lock.",stateMessageQueue);
                     this.recoveringQueueMutex.remove(stateMessageQueue);
                 }
             } catch (Throwable e) {
@@ -286,7 +293,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
     }
 
     private void pullToLast(DefaultLitePullConsumer consumer) throws Throwable 
{
-        Set<MessageQueue> readyToRecover = consumer.assignment();
+        Set<MessageQueue> readyToRecover = 
assignMessageQueue(consumer);//consumer.assignment();
         for (MessageQueue messageQueue : readyToRecover) {
             this.recoveringQueueMutex.computeIfAbsent(messageQueue, 
messageQueue1 -> new CountDownLatch2(1));
         }
@@ -311,13 +318,28 @@ public class RocketMQStore extends AbstractStore 
implements StateStore {
         }
 
         //恢复完毕;
-        Set<MessageQueue> recoverOver = consumer.assignment();
-        for (MessageQueue messageQueue : recoverOver) {
+        for (MessageQueue messageQueue : readyToRecover) {
             CountDownLatch2 waitPoint = 
this.recoveringQueueMutex.get(messageQueue);
             waitPoint.countDown();
         }
     }
 
+    private Set<MessageQueue> assignMessageQueue(DefaultLitePullConsumer 
consumer) throws Throwable {
+        Class<? extends DefaultLitePullConsumer> consumerClass = 
consumer.getClass();
+
+        Field consumerImpl = 
consumerClass.getDeclaredField("defaultLitePullConsumerImpl");
+        consumerImpl.setAccessible(true);
+
+        DefaultLitePullConsumerImpl defaultLitePullConsumer = 
(DefaultLitePullConsumerImpl)consumerImpl.get(consumer);
+
+        Field assignedMessageQueueField = 
defaultLitePullConsumer.getClass().getDeclaredField("assignedMessageQueue");
+        assignedMessageQueueField.setAccessible(true);
+
+        AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) 
assignedMessageQueueField.get(defaultLitePullConsumer);
+
+        return assignedMessageQueue.getAssignedMessageQueues();
+    }
+
     //拉的数据越多,重放效率越高,
     // 能保证一个q里面后面pull到的数据queueOffset一定比前一批次拉取的queueOffset大吗?
     private void replayState(List<MessageExt> msgs) throws Throwable {
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index 7f75361c..5ec4422d 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -48,5 +48,6 @@ public interface StateStore extends AutoCloseable {
 
     void delete(byte[] key) throws Throwable;
 
+    //persist state into rocketmq
     void persist(Set<MessageQueue> messageQueue) throws Throwable;
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
index 2251c8e1..36455719 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
@@ -22,19 +22,18 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
-import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -54,7 +53,7 @@ public class RocketMQUtil {
             clusters = getCluster(mqAdmin);
         }
 
-        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, 
queueNum);
+        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, 
queueNum, PermName.PERM_READ | PermName.PERM_WRITE);
 
         for (String cluster : clusters) {
             Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
@@ -71,72 +70,6 @@ public class RocketMQUtil {
         createNormalTopic(mqAdmin, topicName, queueNum, clusters);
     }
 
-    public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, 
String topicName, int queueNum, Set<String> clusters) throws Exception {
-        if (check(mqAdmin, topicName)) {
-            logger.info("topic[{}] already exist.", topicName);
-            return;
-        }
-
-        if (clusters == null || clusters.size() == 0) {
-            clusters = getCluster(mqAdmin);
-        }
-
-
-        for (String cluster : clusters) {
-            createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), 
cluster, mqAdmin.getNamesrvAddr());
-            logger.info("【step 1】create static topic:[{}] in cluster:[{}] 
success, logic queue num:[{}].", topicName, cluster, queueNum);
-
-            update2CompactTopicWithCommand(topicName, queueNum, cluster, 
mqAdmin.getNamesrvAddr());
-            logger.info("【step 2】update static topic to compact topic success. 
topic:[{}], cluster:[{}]", topicName, cluster);
-        }
-
-        existTopic.add(topicName);
-        logger.info("create static-compact topic [{}] success, queue num 
[{}]", topicName, queueNum);
-    }
-
-    public static void createStaticTopic(DefaultMQAdminExt mqAdmin, String 
topicName, int queueNum) throws Exception {
-        if (check(mqAdmin, topicName)) {
-            logger.info("topic[{}] already exist.", topicName);
-            return;
-        }
-
-        Set<String> clusters = getCluster(mqAdmin);
-        for (String cluster : clusters) {
-            createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), 
cluster, mqAdmin.getNamesrvAddr());
-            logger.info("create static topic:[{}] in cluster:[{}] success, 
logic queue num:[{}].", topicName, cluster, queueNum);
-        }
-
-        existTopic.add(topicName);
-    }
-
-    private static void createStaticTopicWithCommand(String topic, int 
queueNum, Set<String> brokers, String cluster, String nameservers) throws 
Exception {
-        UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
-        Options options = ServerUtil.buildCommandlineOptions(new Options());
-        String[] args;
-        if (cluster != null) {
-            args = new String[]{
-                    "-c", cluster,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
-            };
-        } else {
-            String brokerStr = String.join(",", brokers);
-            args = new String[]{
-                    "-b", brokerStr,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
-            };
-        }
-
-        final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
cmd.commandName(), args, cmd.buildCommandlineOptions(options), new 
PosixParser());
-
-        String namesrvAddr = commandLine.getOptionValue('n');
-        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
-
-        cmd.execute(commandLine, options, null);
-    }
 
     private static void update2CompactTopicWithCommand(String topic, int 
queueNum, String cluster, String nameservers) throws Exception {
         UpdateTopicSubCommand command = new UpdateTopicSubCommand();

Reply via email to