This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch 49x/develop in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit a809f17a7bdd67324977d8321414ca7da2ebd925 Author: 维章 <[email protected]> AuthorDate: Mon Jan 16 10:29:24 2023 +0800 create normal topic for state storage. --- .../core/function/supplier/SinkSupplier.java | 10 ++- .../rocketmq/streams/core/state/RocketMQStore.java | 28 ++++++++- .../rocketmq/streams/core/state/StateStore.java | 1 + .../rocketmq/streams/core/util/RocketMQUtil.java | 71 +--------------------- 4 files changed, 35 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java index 393e8956..09d56467 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java @@ -61,7 +61,7 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> { this.key = context.getKey(); } - //sink into shuffle topic/state topic/user topic + //sink into shuffle/user topic @Override public void process(T data) throws Throwable { if (data != null) { @@ -83,8 +83,9 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> { producer.send(message); } else { message = new Message(this.topicName, value); + String hexKey = Utils.toHexString(this.key); //the real key is in the body, this key is used to route the same key into the same queue. - message.setKeys(Utils.toHexString(this.key)); + message.setKeys(hexKey); message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, this.key.getClass().getName()); @@ -94,7 +95,10 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> { message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime())); } - producer.send(message, new SelectMessageQueueByHash(), this.key); + //For data write back, Write-prohibited is forbidden, because it will make send message failed. + //And if the MessageQueue num changed(like expansion), the data with same key will be sent into different MessageQueue. + //shuffle topic must be Static topic, to solve the problem in expansion. + producer.send(message, new SelectMessageQueueByHash(), hexKey); } } } diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java index 531321eb..68e39dbe 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.streams.core.state; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.MixAll; @@ -41,6 +43,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -176,6 +179,8 @@ public class RocketMQStore extends AbstractStore implements StateStore { Set<MessageQueue> stateTopicQueues = convertSourceTopicQueue2StateTopicQueue(messageQueues); for (MessageQueue stateTopicQueue : stateTopicQueues) { + //if the source queue is removed, skip it. + String stateTopicQueueKey = buildKey(stateTopicQueue); Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey); @@ -264,6 +269,7 @@ public class RocketMQStore extends AbstractStore implements StateStore { for (String stateUniqueQueue : groupByUniqueQueue.keySet()) { Set<byte[]> stateTopicQueueKey = super.getAll(stateUniqueQueue); for (byte[] key : stateTopicQueueKey) { + logger.info("remove state queue:{}, delete corresponding state from rocksdb", stateUniqueQueue); this.rocksDBStore.deleteByKey(key); } super.removeAll(stateUniqueQueue); @@ -271,6 +277,7 @@ public class RocketMQStore extends AbstractStore implements StateStore { for (MessageQueue stateMessageQueue : stateTopicQueue) { + logger.info("remove state queue:{}, remove corresponding recover lock.",stateMessageQueue); this.recoveringQueueMutex.remove(stateMessageQueue); } } catch (Throwable e) { @@ -286,7 +293,7 @@ public class RocketMQStore extends AbstractStore implements StateStore { } private void pullToLast(DefaultLitePullConsumer consumer) throws Throwable { - Set<MessageQueue> readyToRecover = consumer.assignment(); + Set<MessageQueue> readyToRecover = assignMessageQueue(consumer);//consumer.assignment(); for (MessageQueue messageQueue : readyToRecover) { this.recoveringQueueMutex.computeIfAbsent(messageQueue, messageQueue1 -> new CountDownLatch2(1)); } @@ -311,13 +318,28 @@ public class RocketMQStore extends AbstractStore implements StateStore { } //恢复完毕; - Set<MessageQueue> recoverOver = consumer.assignment(); - for (MessageQueue messageQueue : recoverOver) { + for (MessageQueue messageQueue : readyToRecover) { CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(messageQueue); waitPoint.countDown(); } } + private Set<MessageQueue> assignMessageQueue(DefaultLitePullConsumer consumer) throws Throwable { + Class<? extends DefaultLitePullConsumer> consumerClass = consumer.getClass(); + + Field consumerImpl = consumerClass.getDeclaredField("defaultLitePullConsumerImpl"); + consumerImpl.setAccessible(true); + + DefaultLitePullConsumerImpl defaultLitePullConsumer = (DefaultLitePullConsumerImpl)consumerImpl.get(consumer); + + Field assignedMessageQueueField = defaultLitePullConsumer.getClass().getDeclaredField("assignedMessageQueue"); + assignedMessageQueueField.setAccessible(true); + + AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) assignedMessageQueueField.get(defaultLitePullConsumer); + + return assignedMessageQueue.getAssignedMessageQueues(); + } + //拉的数据越多,重放效率越高, // 能保证一个q里面后面pull到的数据queueOffset一定比前一批次拉取的queueOffset大吗? private void replayState(List<MessageExt> msgs) throws Throwable { diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java index 7f75361c..5ec4422d 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java @@ -48,5 +48,6 @@ public interface StateStore extends AutoCloseable { void delete(byte[] key) throws Throwable; + //persist state into rocketmq void persist(Set<MessageQueue> messageQueue) throws Throwable; } diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java index 2251c8e1..36455719 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java @@ -22,19 +22,18 @@ import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; -import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -54,7 +53,7 @@ public class RocketMQUtil { clusters = getCluster(mqAdmin); } - TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum); + TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum, PermName.PERM_READ | PermName.PERM_WRITE); for (String cluster : clusters) { Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster); @@ -71,72 +70,6 @@ public class RocketMQUtil { createNormalTopic(mqAdmin, topicName, queueNum, clusters); } - public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception { - if (check(mqAdmin, topicName)) { - logger.info("topic[{}] already exist.", topicName); - return; - } - - if (clusters == null || clusters.size() == 0) { - clusters = getCluster(mqAdmin); - } - - - for (String cluster : clusters) { - createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr()); - logger.info("【step 1】create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, queueNum); - - update2CompactTopicWithCommand(topicName, queueNum, cluster, mqAdmin.getNamesrvAddr()); - logger.info("【step 2】update static topic to compact topic success. topic:[{}], cluster:[{}]", topicName, cluster); - } - - existTopic.add(topicName); - logger.info("create static-compact topic [{}] success, queue num [{}]", topicName, queueNum); - } - - public static void createStaticTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum) throws Exception { - if (check(mqAdmin, topicName)) { - logger.info("topic[{}] already exist.", topicName); - return; - } - - Set<String> clusters = getCluster(mqAdmin); - for (String cluster : clusters) { - createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr()); - logger.info("create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, queueNum); - } - - existTopic.add(topicName); - } - - private static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception { - UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand(); - Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] args; - if (cluster != null) { - args = new String[]{ - "-c", cluster, - "-t", topic, - "-qn", String.valueOf(queueNum), - "-n", nameservers - }; - } else { - String brokerStr = String.join(",", brokers); - args = new String[]{ - "-b", brokerStr, - "-t", topic, - "-qn", String.valueOf(queueNum), - "-n", nameservers - }; - } - - final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser()); - - String namesrvAddr = commandLine.getOptionValue('n'); - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); - - cmd.execute(commandLine, options, null); - } private static void update2CompactTopicWithCommand(String topic, int queueNum, String cluster, String nameservers) throws Exception { UpdateTopicSubCommand command = new UpdateTopicSubCommand();
