This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c17baf1e3 [ISSUE #6386] Some improvements for compactionTopic (#6387)
c17baf1e3 is described below

commit c17baf1e33259db450617f59f2f2b0bc01505e87
Author: guyinyou <[email protected]>
AuthorDate: Wed Mar 22 11:23:13 2023 +0800

    [ISSUE #6386] Some improvements for compactionTopic (#6387)
    
    * 1、Graceful shutdown needs to wait for the compactionTopic to build the 
index
    2、Exception handling that may occur when adding pullMessageFromMaster
    3、Update usage docs
    
    * fix doc
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 docs/cn/Example_Compaction_Topic_cn.md             | 16 ++++++++++++-
 docs/en/Example_Compaction_Topic.md                | 22 ++++++++++++++---
 .../rocketmq/store/kv/CompactionService.java       |  9 ++++++-
 .../apache/rocketmq/store/kv/MessageFetcher.java   | 28 +++++++++++++---------
 4 files changed, 59 insertions(+), 16 deletions(-)

diff --git a/docs/cn/Example_Compaction_Topic_cn.md 
b/docs/cn/Example_Compaction_Topic_cn.md
index 0cb4bffdc..9793fccd5 100644
--- a/docs/cn/Example_Compaction_Topic_cn.md
+++ b/docs/cn/Example_Compaction_Topic_cn.md
@@ -1,14 +1,25 @@
 # Compaction Topic
 
 ## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+CompactionTopic依赖顺序消息来保障一致性
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
 ### 创建compaction topic
+
 ```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n 
localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n 
localhost:9876 -t ctopic -o true -c DefaultCluster
 create topic to 127.0.0.1:10911 success.
 TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, 
topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, 
attributes={+delete.policy=COMPACTION}]
 ```
+
 ### 生产数据
+
 与普通消息一样
+
 ```java
 DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
 producer.setNamesrvAddr("localhost:9876");
@@ -28,9 +39,12 @@ SendResult sendResult = producer.send(msg, (mqs, message, 
shardingKey) -> {
 
 System.out.printf("%s%n", sendResult);
 ``` 
+
 ### 消费数据
+
 消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
 在compaction场景下,大部分消费都是从0开始消费完整的数据
+
 ```java
 DefaultLitePullConsumer consumer = new 
DefaultLitePullConsumer("compactionTestGroup");
 consumer.setNamesrvAddr("localhost:9876");
diff --git a/docs/en/Example_Compaction_Topic.md 
b/docs/en/Example_Compaction_Topic.md
index 695860b6a..76af95902 100644
--- a/docs/en/Example_Compaction_Topic.md
+++ b/docs/en/Example_Compaction_Topic.md
@@ -1,9 +1,16 @@
 # Compaction Topic
 
 ## use example
+
+### Turn on the opening of support for orderMessages on namesrv
+CompactionTopic relies on orderMessages to ensure consistency
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
 ### create compaction topic
 ```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n 
localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n 
localhost:9876 -t ctopic -o true -c DefaultCluster
 create topic to 127.0.0.1:10911 success.
 TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, 
topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, 
attributes={+delete.policy=COMPACTION}]
 ```
@@ -15,8 +22,17 @@ DefaultMQProducer producer = new 
DefaultMQProducer("CompactionTestGroup");
 producer.setNamesrvAddr("localhost:9876");
 producer.start();
 
-Message msg = new Message(topic, "tags", "keys", 
"bodys"getBytes(StandardCharsets.UTF_8));
-SendResult sendResult = producer.send(msg);
+String topic = "ctopic";
+String tag = "tag1";
+String key = "key1";
+Message msg = new Message(topic, tag, key, 
"bodys"getBytes(StandardCharsets.UTF_8));
+SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
+    int select = Math.abs(shardingKey.hashCode());
+    if (select < 0) {
+        select = 0;
+    }
+    return mqs.get(select % mqs.size());
+}, key);
 
 System.out.printf("%s%n", sendResult);
 ```
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
index 1b5d38913..205a6a2f9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
@@ -67,7 +67,7 @@ public class CompactionService extends ServiceThread {
     }
 
     public GetMessageResult getMessage(final String group, final String topic, 
final int queueId,
-        final long offset, final int maxMsgNums, final int maxTotalMsgSize) {
+                                       final long offset, final int 
maxMsgNums, final int maxTotalMsgSize) {
         return compactionStore.getMessage(group, topic, queueId, offset, 
maxMsgNums, maxTotalMsgSize);
     }
 
@@ -126,6 +126,13 @@ public class CompactionService extends ServiceThread {
     @Override
     public void shutdown() {
         super.shutdown();
+        while (!compactionMsgQ.isEmpty()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ignored) {
+
+            }
+        }
         compactionStore.shutdown();
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
index 17bd6bb4d..0ce0a3d8d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
@@ -19,8 +19,10 @@ package org.apache.rocketmq.store.kv;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.util.function.BiFunction;
+
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -51,6 +53,7 @@ public class MessageFetcher implements AutoCloseable {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final RemotingClient client;
+
     public MessageFetcher() {
         NettyClientConfig nettyClientConfig = new NettyClientConfig();
         nettyClientConfig.setUseTLS(false);
@@ -85,12 +88,13 @@ public class MessageFetcher implements AutoCloseable {
     private String getConsumerGroup(String topic, int queueId) {
         return String.join("-", topic, String.valueOf(queueId), "pull", 
"group");
     }
+
     private String getClientId() {
         return String.join("@", NetworkUtil.getLocalAddress(), 
"compactionIns", "compactionUnit");
     }
 
     private boolean prepare(String masterAddr, String topic, String groupName, 
long subVersion)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
+            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
         HeartbeatData heartbeatData = new HeartbeatData();
 
         heartbeatData.setClientID(getClientId());
@@ -121,7 +125,7 @@ public class MessageFetcher implements AutoCloseable {
     }
 
     private boolean pullDone(String masterAddr, String groupName)
-        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
+            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
         UnregisterClientRequestHeader requestHeader = new 
UnregisterClientRequestHeader();
         requestHeader.setClientID(getClientId());
         requestHeader.setProducerGroup("");
@@ -140,14 +144,16 @@ public class MessageFetcher implements AutoCloseable {
     }
 
     public void pullMessageFromMaster(String topic, int queueId, long 
endOffset, String masterAddr,
-        BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws 
Exception {
+                                      BiFunction<Long, RemotingCommand, 
Boolean> responseHandler) throws Exception {
         long currentPullOffset = 0;
 
         try {
             long subVersion = System.currentTimeMillis();
             String groupName = getConsumerGroup(topic, queueId);
-            prepare(masterAddr, topic, groupName, subVersion);
-
+            if (!prepare(masterAddr, topic, groupName, subVersion)) {
+                log.error("{}:{} prepare to {} pull message failed", topic, 
queueId, masterAddr);
+                throw new RemotingCommandException(topic + ":" + queueId + " 
prepare to " + masterAddr + " pull message failed");
+            }
 
             boolean noNewMsg = false;
             boolean keepPull = true;
@@ -157,11 +163,11 @@ public class MessageFetcher implements AutoCloseable {
                 PullMessageRequestHeader requestHeader = 
createPullMessageRequest(topic, queueId, currentPullOffset, subVersion);
 
                 RemotingCommand
-                    request = 
RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, 
requestHeader);
+                        request = 
RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, 
requestHeader);
                 RemotingCommand response = client.invokeSync(masterAddr, 
request, 1000 * 30L);
 
                 PullMessageResponseHeader responseHeader =
-                    
(PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+                        (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
                 if (responseHeader == null) {
                     log.error("{}:{} pull message responseHeader is null", 
topic, queueId);
                     throw new RemotingCommandException(topic + ":" + queueId + 
" pull message responseHeader is null");
@@ -175,20 +181,20 @@ public class MessageFetcher implements AutoCloseable {
                         break;
                     case ResponseCode.PULL_NOT_FOUND:       // NO_NEW_MSG, 
need break loop
                         log.info("PULL_NOT_FOUND, topic:{}, queueId:{}, 
pullOffset:{},",
-                            topic, queueId, currentPullOffset);
+                                topic, queueId, currentPullOffset);
                         noNewMsg = true;
                         break;
                     case ResponseCode.PULL_RETRY_IMMEDIATELY:
                         log.info("PULL_RETRY_IMMEDIATE, topic:{}, queueId:{}, 
pullOffset:{},",
-                            topic, queueId, currentPullOffset);
+                                topic, queueId, currentPullOffset);
                         break;
                     case ResponseCode.PULL_OFFSET_MOVED:
                         log.info("PULL_OFFSET_MOVED, topic:{}, queueId:{}, 
pullOffset:{},",
-                            topic, queueId, currentPullOffset);
+                                topic, queueId, currentPullOffset);
                         break;
                     default:
                         log.warn("Pull Message error, response code: {}, 
remark: {}",
-                            response.getCode(), response.getRemark());
+                                response.getCode(), response.getRemark());
                 }
 
                 if (noNewMsg || !keepPull) {

Reply via email to