This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c17baf1e3 [ISSUE #6386] Some improvements for compactionTopic (#6387)
c17baf1e3 is described below
commit c17baf1e33259db450617f59f2f2b0bc01505e87
Author: guyinyou <[email protected]>
AuthorDate: Wed Mar 22 11:23:13 2023 +0800
[ISSUE #6386] Some improvements for compactionTopic (#6387)
* 1、Graceful shutdown needs to wait for the compactionTopic to build the
index
2、Exception handling that may occur when adding pullMessageFromMaster
3、Update usage docs
* fix doc
---------
Co-authored-by: guyinyou <[email protected]>
---
docs/cn/Example_Compaction_Topic_cn.md | 16 ++++++++++++-
docs/en/Example_Compaction_Topic.md | 22 ++++++++++++++---
.../rocketmq/store/kv/CompactionService.java | 9 ++++++-
.../apache/rocketmq/store/kv/MessageFetcher.java | 28 +++++++++++++---------
4 files changed, 59 insertions(+), 16 deletions(-)
diff --git a/docs/cn/Example_Compaction_Topic_cn.md
b/docs/cn/Example_Compaction_Topic_cn.md
index 0cb4bffdc..9793fccd5 100644
--- a/docs/cn/Example_Compaction_Topic_cn.md
+++ b/docs/cn/Example_Compaction_Topic_cn.md
@@ -1,14 +1,25 @@
# Compaction Topic
## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+CompactionTopic依赖顺序消息来保障一致性
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
### 创建compaction topic
+
```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n
localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n
localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-,
topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false,
attributes={+delete.policy=COMPACTION}]
```
+
### 生产数据
+
与普通消息一样
+
```java
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
@@ -28,9 +39,12 @@ SendResult sendResult = producer.send(msg, (mqs, message,
shardingKey) -> {
System.out.printf("%s%n", sendResult);
```
+
### 消费数据
+
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
在compaction场景下,大部分消费都是从0开始消费完整的数据
+
```java
DefaultLitePullConsumer consumer = new
DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
diff --git a/docs/en/Example_Compaction_Topic.md
b/docs/en/Example_Compaction_Topic.md
index 695860b6a..76af95902 100644
--- a/docs/en/Example_Compaction_Topic.md
+++ b/docs/en/Example_Compaction_Topic.md
@@ -1,9 +1,16 @@
# Compaction Topic
## use example
+
+### Turn on the opening of support for orderMessages on namesrv
+CompactionTopic relies on orderMessages to ensure consistency
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
### create compaction topic
```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n
localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n
localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-,
topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false,
attributes={+delete.policy=COMPACTION}]
```
@@ -15,8 +22,17 @@ DefaultMQProducer producer = new
DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
-Message msg = new Message(topic, "tags", "keys",
"bodys"getBytes(StandardCharsets.UTF_8));
-SendResult sendResult = producer.send(msg);
+String topic = "ctopic";
+String tag = "tag1";
+String key = "key1";
+Message msg = new Message(topic, tag, key,
"bodys"getBytes(StandardCharsets.UTF_8));
+SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
+ int select = Math.abs(shardingKey.hashCode());
+ if (select < 0) {
+ select = 0;
+ }
+ return mqs.get(select % mqs.size());
+}, key);
System.out.printf("%s%n", sendResult);
```
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
index 1b5d38913..205a6a2f9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
@@ -67,7 +67,7 @@ public class CompactionService extends ServiceThread {
}
public GetMessageResult getMessage(final String group, final String topic,
final int queueId,
- final long offset, final int maxMsgNums, final int maxTotalMsgSize) {
+ final long offset, final int
maxMsgNums, final int maxTotalMsgSize) {
return compactionStore.getMessage(group, topic, queueId, offset,
maxMsgNums, maxTotalMsgSize);
}
@@ -126,6 +126,13 @@ public class CompactionService extends ServiceThread {
@Override
public void shutdown() {
super.shutdown();
+ while (!compactionMsgQ.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+
+ }
+ }
compactionStore.shutdown();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
index 17bd6bb4d..0ce0a3d8d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
@@ -19,8 +19,10 @@ package org.apache.rocketmq.store.kv;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import java.io.IOException;
import java.util.function.BiFunction;
+
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -51,6 +53,7 @@ public class MessageFetcher implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final RemotingClient client;
+
public MessageFetcher() {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(false);
@@ -85,12 +88,13 @@ public class MessageFetcher implements AutoCloseable {
private String getConsumerGroup(String topic, int queueId) {
return String.join("-", topic, String.valueOf(queueId), "pull",
"group");
}
+
private String getClientId() {
return String.join("@", NetworkUtil.getLocalAddress(),
"compactionIns", "compactionUnit");
}
private boolean prepare(String masterAddr, String topic, String groupName,
long subVersion)
- throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
HeartbeatData heartbeatData = new HeartbeatData();
heartbeatData.setClientID(getClientId());
@@ -121,7 +125,7 @@ public class MessageFetcher implements AutoCloseable {
}
private boolean pullDone(String masterAddr, String groupName)
- throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
UnregisterClientRequestHeader requestHeader = new
UnregisterClientRequestHeader();
requestHeader.setClientID(getClientId());
requestHeader.setProducerGroup("");
@@ -140,14 +144,16 @@ public class MessageFetcher implements AutoCloseable {
}
public void pullMessageFromMaster(String topic, int queueId, long
endOffset, String masterAddr,
- BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws
Exception {
+ BiFunction<Long, RemotingCommand,
Boolean> responseHandler) throws Exception {
long currentPullOffset = 0;
try {
long subVersion = System.currentTimeMillis();
String groupName = getConsumerGroup(topic, queueId);
- prepare(masterAddr, topic, groupName, subVersion);
-
+ if (!prepare(masterAddr, topic, groupName, subVersion)) {
+ log.error("{}:{} prepare to {} pull message failed", topic,
queueId, masterAddr);
+ throw new RemotingCommandException(topic + ":" + queueId + "
prepare to " + masterAddr + " pull message failed");
+ }
boolean noNewMsg = false;
boolean keepPull = true;
@@ -157,11 +163,11 @@ public class MessageFetcher implements AutoCloseable {
PullMessageRequestHeader requestHeader =
createPullMessageRequest(topic, queueId, currentPullOffset, subVersion);
RemotingCommand
- request =
RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE,
requestHeader);
+ request =
RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE,
requestHeader);
RemotingCommand response = client.invokeSync(masterAddr,
request, 1000 * 30L);
PullMessageResponseHeader responseHeader =
-
(PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ (PullMessageResponseHeader)
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
if (responseHeader == null) {
log.error("{}:{} pull message responseHeader is null",
topic, queueId);
throw new RemotingCommandException(topic + ":" + queueId +
" pull message responseHeader is null");
@@ -175,20 +181,20 @@ public class MessageFetcher implements AutoCloseable {
break;
case ResponseCode.PULL_NOT_FOUND: // NO_NEW_MSG,
need break loop
log.info("PULL_NOT_FOUND, topic:{}, queueId:{},
pullOffset:{},",
- topic, queueId, currentPullOffset);
+ topic, queueId, currentPullOffset);
noNewMsg = true;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
log.info("PULL_RETRY_IMMEDIATE, topic:{}, queueId:{},
pullOffset:{},",
- topic, queueId, currentPullOffset);
+ topic, queueId, currentPullOffset);
break;
case ResponseCode.PULL_OFFSET_MOVED:
log.info("PULL_OFFSET_MOVED, topic:{}, queueId:{},
pullOffset:{},",
- topic, queueId, currentPullOffset);
+ topic, queueId, currentPullOffset);
break;
default:
log.warn("Pull Message error, response code: {},
remark: {}",
- response.getCode(), response.getRemark());
+ response.getCode(), response.getRemark());
}
if (noNewMsg || !keepPull) {