This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0b24768500 [ISSUE #8822] Double write cq, reduce unnecessary switches
(#8823)
0b24768500 is described below
commit 0b247685007258d1502fa992434402fd40b92cea
Author: LetLetMe <[email protected]>
AuthorDate: Tue Oct 29 19:09:35 2024 +0800
[ISSUE #8822] Double write cq, reduce unnecessary switches (#8823)
* Reduce unnecessary switches
---
.../rocketmq/broker/RocksDBConfigManager.java | 14 +-
.../config/v1/RocksDBConsumerOffsetManager.java | 9 +-
.../config/v1/RocksDBSubscriptionGroupManager.java | 8 +-
.../config/v1/RocksDBTopicConfigManager.java | 8 +-
.../broker/processor/AdminBrokerProcessor.java | 198 ++++++++++++++-------
.../offset/RocksdbTransferOffsetAndCqTest.java | 1 -
.../RocksdbGroupConfigTransferTest.java | 1 -
.../topic/RocksdbTopicConfigManagerTest.java | 1 -
.../topic/RocksdbTopicConfigTransferTest.java | 1 -
.../rocketmq/client/impl/MQClientAPIImpl.java | 7 +-
.../rocketmq/common/CheckRocksdbCqWriteResult.java | 38 +++-
.../common/config/AbstractRocksDBStorage.java | 3 +-
.../common/config/ConfigRocksDBStorage.java | 12 +-
.../CheckRocksdbCqWriteProgressRequestHeader.java | 11 ++
.../apache/rocketmq/store/RocksDBMessageStore.java | 4 +
.../rocketmq/store/config/MessageStoreConfig.java | 32 ++--
.../store/rocksdb/RocksDBOptionsFactory.java | 5 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 6 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +-
.../queue/CheckRocksdbCqWriteProgressCommand.java | 21 ++-
21 files changed, 246 insertions(+), 145 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index 20358c4707..ee2d4e54a6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -17,40 +17,40 @@
package org.apache.rocketmq.broker;
import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
-
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;
-
private final String filePath;
private final long memTableFlushInterval;
+ private final CompressionType compressionType;
private DataVersion kvDataVersion = new DataVersion();
-
- public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
+ public RocksDBConfigManager(String filePath, long memTableFlushInterval,
CompressionType compressionType) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
+ this.compressionType = compressionType;
}
public boolean init() {
this.isStop = false;
- this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
+ this.configRocksDBStorage = new ConfigRocksDBStorage(filePath,
compressionType);
return this.configRocksDBStorage.start();
}
public boolean loadDataVersion() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 8066fe769a..824fc0fee3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
import org.rocksdb.WriteBatch;
public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
@@ -41,7 +42,9 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
- this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+
}
@Override
@@ -61,10 +64,6 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
}
private boolean merge() {
- if
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
- log.info("the switch transferOffsetJsonToRocksdb is off, no merge
offset operation is needed.");
- return true;
- }
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file does not exist, so skip merge");
return true;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 8175d63cce..8fc7a4d6ed 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.CompressionType;
import org.rocksdb.RocksIterator;
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
@@ -40,7 +41,8 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
- this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}
@Override
@@ -78,10 +80,6 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
private boolean merge() {
- if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
- log.info("the switch transferMetadataJsonToRocksdb is off, no
merge subGroup operation is needed.");
- return true;
- }
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("subGroup json file does not exist, so skip merge");
return true;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index bce67392f6..18e633d348 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
public class RocksDBTopicConfigManager extends TopicConfigManager {
@@ -35,7 +36,8 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
- this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}
@Override
@@ -59,10 +61,6 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
}
private boolean merge() {
- if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
- log.info("the switch transferMetadataJsonToRocksdb is off, no
merge topic operation is needed.");
- return true;
- }
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("topic json file does not exist, so skip merge");
return true;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index aa962513df..381889c624 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -39,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -65,7 +67,9 @@ import org.apache.rocketmq.broker.metrics.InvocationStatus;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
@@ -214,6 +218,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
@@ -232,6 +237,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final BrokerController brokerController;
protected Set<String> configBlackList = new HashSet<>();
+ private final ExecutorService asyncExecuteWorker = new
ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
@@ -467,76 +473,23 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
- private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext
ctx, RemotingCommand request) throws RemotingCommandException {
- CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
- String requestTopic = requestHeader.getTopic();
- final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- response.setCode(ResponseCode.SUCCESS);
- MessageStore messageStore = brokerController.getMessageStore();
- DefaultMessageStore defaultMessageStore;
- if (messageStore instanceof AbstractPluginMessageStore) {
- defaultMessageStore = (DefaultMessageStore)
((AbstractPluginMessageStore) messageStore).getNext();
- } else {
- defaultMessageStore = (DefaultMessageStore) messageStore;
- }
- RocksDBMessageStore rocksDBMessageStore =
defaultMessageStore.getRocksDBMessageStore();
- if
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
- response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
"rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is
invalid")));
- return response;
- }
-
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
cqTable = defaultMessageStore.getConsumeQueueTable();
- StringBuilder diffResult = new StringBuilder();
- try {
- if (StringUtils.isNotBlank(requestTopic)) {
- processConsumeQueuesForTopic(cqTable.get(requestTopic),
requestTopic, rocksDBMessageStore, diffResult,false);
-
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
diffResult.toString())));
- return response;
- }
- for (Map.Entry<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
- String topic = topicEntry.getKey();
- processConsumeQueuesForTopic(topicEntry.getValue(), topic,
rocksDBMessageStore, diffResult,true);
+ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext
ctx, RemotingCommand request) {
+ CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
+ Runnable runnable = () -> {
+ try {
+ CheckRocksdbCqWriteResult checkResult =
doCheckRocksdbCqWriteProgress(ctx, request);
+ LOGGER.info("checkRocksdbCqWriteProgress result: {}",
JSON.toJSONString(checkResult));
+ } catch (Exception e) {
+ LOGGER.error("checkRocksdbCqWriteProgress error", e);
}
- diffResult.append("check all topic successful,
size:").append(cqTable.size());
- response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
diffResult.toString())));
-
- } catch (Exception e) {
- LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
- response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
e.getMessage())));
- }
+ };
+ asyncExecuteWorker.submit(runnable);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(JSON.toJSONBytes(result));
return response;
}
-
- private void processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
- for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
- Integer queueId = queueEntry.getKey();
- ConsumeQueueInterface jsonCq = queueEntry.getValue();
- ConsumeQueueInterface kvCq =
rocksDBMessageStore.getConsumeQueue(topic, queueId);
- if (!checkAll) {
- String format = String.format("\n[topic: %s, queue: %s] \n
kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
- topic, queueId, kvCq.getEarliestUnit(),
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
- diffResult.append(format).append("\n");
- }
- long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
- long minOffsetInQueue = kvCq.getMinOffsetInQueue();
- for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
- Pair<CqUnit, Long> fileCqUnit =
jsonCq.getCqUnitAndStoreTime(i);
- Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
- if (fileCqUnit == null || kvCqUnit == null) {
- diffResult.append(String.format("[topic: %s, queue: %s,
offset: %s] \n kv : %s \n file : %s \n",
- topic, queueId, i, kvCqUnit != null ?
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() :
"null"));
- return;
- }
- if (!checkCqUnitEqual(kvCqUnit.getObject1(),
fileCqUnit.getObject1())) {
- String diffInfo = String.format("[topic:%s, queue: %s
offset: %s] \n file : %s \n kv : %s \n",
- topic, queueId, i, kvCqUnit.getObject1(),
fileCqUnit.getObject1());
- LOGGER.error(diffInfo);
- diffResult.append(diffInfo).append(System.lineSeparator());
- return;
- }
- }
- }
- }
@Override
public boolean rejectRequest() {
return false;
@@ -3418,6 +3371,115 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return false;
}
+ private CheckRocksdbCqWriteResult
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand
request) throws RemotingCommandException {
+ CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+ String requestTopic = requestHeader.getTopic();
+ MessageStore messageStore = brokerController.getMessageStore();
+ DefaultMessageStore defaultMessageStore;
+ if (messageStore instanceof AbstractPluginMessageStore) {
+ defaultMessageStore = (DefaultMessageStore)
((AbstractPluginMessageStore) messageStore).getNext();
+ } else {
+ defaultMessageStore = (DefaultMessageStore) messageStore;
+ }
+ RocksDBMessageStore rocksDBMessageStore =
defaultMessageStore.getRocksDBMessageStore();
+ CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+ if
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
{
+ result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need
check");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+ return result;
+ }
+
+ if
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+ result.setCheckResult("rocksdbCQWriteEnable is false,
checkRocksdbCqWriteProgressCommand is invalid");
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
cqTable = defaultMessageStore.getConsumeQueueTable();
+ StringBuilder diffResult = new StringBuilder();
+ try {
+ if (StringUtils.isNotBlank(requestTopic)) {
+ boolean checkResult =
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic,
rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkResult ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ return result;
+ }
+ int successNum = 0;
+ int checkSize = 0;
+ for (Map.Entry<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+ boolean checkResult =
processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(),
rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
+ successNum += checkResult ? 1 : 0;
+ checkSize++;
+ }
+ // check all topic finish, all topic is ready, checkSize: 100,
currentQueueNum: 110 -> ready (The currentQueueNum means when we do
checking, new topics are added.)
+ // check all topic finish, success/all : 89/100, currentQueueNum:
110 -> not ready
+ boolean checkReady = successNum == checkSize;
+ String checkResultString = checkReady ? String.format("all topic
is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) :
+ String.format("success/all : %s/%s, currentQueueNum: %s",
successNum, checkSize, cqTable.size());
+ diffResult.append("check all topic finish,
").append(checkResultString);
+ result.setCheckResult(diffResult.toString());
+ result.setCheckStatus(checkReady ?
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() :
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+ } catch (Exception e) {
+ LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+ result.setCheckResult(e.getMessage() +
Arrays.toString(e.getStackTrace()));
+
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+ }
+ return result;
+ }
+
+ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long
checkpointByStoreTime) {
+ boolean processResult = true;
+ for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
+ Integer queueId = queueEntry.getKey();
+ ConsumeQueueInterface jsonCq = queueEntry.getValue();
+ ConsumeQueueInterface kvCq =
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+ if (printDetail) {
+ String format = String.format("[topic: %s, queue: %s] \n
kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+ topic, queueId, kvCq.getEarliestUnit(),
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+ diffResult.append(format).append("\n");
+ }
+
+ long minOffsetByTime = 0L;
+ try {
+ minOffsetByTime =
rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic,
queueId, checkpointByStoreTime, BoundaryType.UPPER);
+ } catch (Exception e) {
+ // ignore
+ }
+ long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+ long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
+ long checkTo = jsonCq.getMaxOffsetInQueue() - 1;
+ /*
+
checkTo(maxOffsetInQueue - 1)
+ v
+ fileCq +------------------------------------------------------+
+ kvCq +----------------------------------------------+
+ ^ ^
+ minOffsetInQueue minOffsetByTime
+ ^
+ checkFrom = max(minOffsetInQueue, minOffsetByTime)
+ */
+ // The latest message is earlier than the check time
+ Pair<CqUnit, Long> fileLatestCq =
jsonCq.getCqUnitAndStoreTime(checkTo);
+ if (fileLatestCq != null) {
+ if (fileLatestCq.getObject2() < checkpointByStoreTime) {
+ continue;
+ }
+ }
+ for (long i = checkFrom; i <= checkTo; i++) {
+ Pair<CqUnit, Long> fileCqUnit =
jsonCq.getCqUnitAndStoreTime(i);
+ Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+ if (fileCqUnit == null || kvCqUnit == null ||
!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
+ LOGGER.error(String.format("[topic: %s, queue: %s, offset:
%s] \n file : %s \n kv : %s \n",
+ topic, queueId, i, kvCqUnit != null ?
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() :
"null"));
+ processResult = false;
+ break;
+ }
+ }
+ }
+ return processResult;
+ }
+
private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
return false;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index 64c505eb77..4b320eb53f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -76,7 +76,6 @@ public class RocksdbTransferOffsetAndCqTest {
brokerConfig.setConsumerOffsetUpdateVersionStep(10);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
- messageStoreConfig.setTransferOffsetJsonToRocksdb(true);
messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 26017af8a6..c75fe0d6a0 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -68,7 +68,6 @@ public class RocksdbGroupConfigTransferTest {
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
- messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index 080e1dd5a3..fa3ef95f55 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -72,7 +72,6 @@ public class RocksdbTopicConfigManagerTest {
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
- messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index fb345548e4..e925ed4bd8 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -69,7 +69,6 @@ public class RocksdbTopicConfigTransferTest {
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
- messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 716d081ef4..554b1efa52 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -113,7 +114,6 @@ import
org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
-import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@@ -3019,15 +3019,16 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
throw new MQClientException(response.getCode(), response.getRemark());
}
- public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final
long timeoutMillis) throws InterruptedException,
+ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(final String
brokerAddr, final String topic, final long checkStoreTime, final long
timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
CheckRocksdbCqWriteProgressRequestHeader header = new
CheckRocksdbCqWriteProgressRequestHeader();
header.setTopic(topic);
+ header.setCheckStoreTime(checkStoreTime);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
header);
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
assert response != null;
if (ResponseCode.SUCCESS == response.getCode()) {
- return
CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(),
CheckRocksdbCqWriteProgressResponseBody.class);
+ return JSON.parseObject(response.getBody(),
CheckRocksdbCqWriteResult.class);
}
throw new MQClientException(response.getCode(), response.getRemark());
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
similarity index 52%
rename from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
rename to
common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
index 76719ac1a2..fc67df86c2 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
+++
b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
@@ -15,21 +15,43 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol.body;
+package org.apache.rocketmq.common;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+public class CheckRocksdbCqWriteResult {
+ String checkResult;
-public class CheckRocksdbCqWriteProgressResponseBody extends
RemotingSerializable {
+ int checkStatus;
- String diffResult;
+ public enum CheckStatus {
+ CHECK_OK(0),
+ CHECK_NOT_OK(1),
+ CHECK_IN_PROGRESS(2),
+ CHECK_ERROR(3);
- public String getDiffResult() {
- return diffResult;
+ private int value;
+
+ CheckStatus(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+ }
+
+ public String getCheckResult() {
+ return checkResult;
}
- public void setDiffResult(String diffResult) {
- this.diffResult = diffResult;
+ public void setCheckResult(String checkResult) {
+ this.checkResult = checkResult;
}
+ public int getCheckStatus() {
+ return checkStatus;
+ }
+ public void setCheckStatus(int checkStatus) {
+ this.checkStatus = checkStatus;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 42ddbdc728..d434cce745 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -86,6 +86,7 @@ public abstract class AbstractRocksDBStorage {
protected final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
protected volatile boolean loaded;
+ protected CompressionType compressionType =
CompressionType.LZ4_COMPRESSION;
private volatile boolean closed;
private final Semaphore reloadPermit = new Semaphore(1);
@@ -156,7 +157,7 @@ public abstract class AbstractRocksDBStorage {
protected void initCompactionOptions() {
this.compactionOptions = new CompactionOptions();
- this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
+ this.compactionOptions.setCompression(compressionType);
this.compactionOptions.setMaxSubcompactions(4);
this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 36da6834ff..3b924a6a0d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompressionType;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -37,12 +38,17 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
protected ColumnFamilyHandle kvDataVersionFamilyHandle;
protected ColumnFamilyHandle forbiddenFamilyHandle;
-
public static final byte[] KV_DATA_VERSION_KEY =
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
+
+
public ConfigRocksDBStorage(final String dbPath) {
- super(dbPath);
- this.readOnly = false;
+ this(dbPath, false);
+ }
+
+ public ConfigRocksDBStorage(final String dbPath, CompressionType
compressionType) {
+ this(dbPath, false);
+ this.compressionType = compressionType;
}
public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
index fee158b497..f679077fdd 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
@@ -32,6 +32,8 @@ public class CheckRocksdbCqWriteProgressRequestHeader
implements CommandCustomHe
@RocketMQResource(ResourceType.TOPIC)
private String topic;
+ private long checkStoreTime;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -44,4 +46,13 @@ public class CheckRocksdbCqWriteProgressRequestHeader
implements CommandCustomHe
public void setTopic(String topic) {
this.topic = topic;
}
+
+ public long getCheckStoreTime() {
+ return checkStoreTime;
+ }
+
+ public void setCheckStoreTime(long checkStoreTime) {
+ this.checkStoreTime = checkStoreTime;
+ }
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 0a7119cab1..321689ac8f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -173,6 +173,10 @@ public class RocksDBMessageStore extends
DefaultMessageStore {
class CommitLogDispatcherBuildRocksdbConsumeQueue implements
CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) throws RocksDBException {
+ boolean enable =
getMessageStoreConfig().isRocksdbCQDoubleWriteEnable();
+ if (!enable) {
+ return;
+ }
final int tranType =
MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e31c03dd22..fe090e3fa2 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.queue.BatchConsumeQueue;
+import org.rocksdb.CompressionType;
public class MessageStoreConfig {
@@ -106,8 +107,6 @@ public class MessageStoreConfig {
@ImportantField
private String storeType = StoreType.DEFAULT.getStoreType();
- private boolean transferMetadataJsonToRocksdb = false;
-
// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 *
ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
@@ -424,8 +423,6 @@ public class MessageStoreConfig {
private boolean putConsumeQueueDataByFileChannel = true;
- private boolean transferOffsetJsonToRocksdb = false;
-
private boolean rocksdbCQDoubleWriteEnable = false;
/**
@@ -443,7 +440,17 @@ public class MessageStoreConfig {
*
* LZ4 is the recommended one.
*/
- private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
+ private String bottomMostCompressionTypeForConsumeQueueStore =
CompressionType.ZSTD_COMPRESSION.getLibraryName();
+
+ private String rocksdbCompressionType =
CompressionType.LZ4_COMPRESSION.getLibraryName();
+
+ public String getRocksdbCompressionType() {
+ return rocksdbCompressionType;
+ }
+
+ public void setRocksdbCompressionType(String compressionType) {
+ this.rocksdbCompressionType = compressionType;
+ }
/**
* Spin number in the retreat strategy of spin lock
@@ -464,13 +471,6 @@ public class MessageStoreConfig {
this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
}
- public boolean isTransferOffsetJsonToRocksdb() {
- return transferOffsetJsonToRocksdb;
- }
-
- public void setTransferOffsetJsonToRocksdb(boolean
transferOffsetJsonToRocksdb) {
- this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb;
- }
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
@@ -1894,14 +1894,6 @@ public class MessageStoreConfig {
this.putConsumeQueueDataByFileChannel =
putConsumeQueueDataByFileChannel;
}
- public boolean isTransferMetadataJsonToRocksdb() {
- return transferMetadataJsonToRocksdb;
- }
-
- public void setTransferMetadataJsonToRocksdb(boolean
transferMetadataJsonToRocksdb) {
- this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
- }
-
public String getBottomMostCompressionTypeForConsumeQueueStore() {
return bottomMostCompressionTypeForConsumeQueueStore;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index d373ba6249..66f5cbd095 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -67,13 +67,16 @@ public class RocksDBOptionsFactory {
setCompressionSizePercent(-1);
String bottomMostCompressionTypeOpt =
messageStore.getMessageStoreConfig()
.getBottomMostCompressionTypeForConsumeQueueStore();
+ String compressionTypeOpt = messageStore.getMessageStoreConfig()
+ .getRocksdbCompressionType();
CompressionType bottomMostCompressionType =
CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
+ CompressionType compressionType =
CompressionType.getCompressionType(compressionTypeOpt);
return columnFamilyOptions.setMaxWriteBufferNumber(4).
setWriteBufferSize(128 * SizeUnit.MB).
setMinWriteBufferNumberToMerge(1).
setTableFormatConfig(blockBasedTableConfig).
setMemTableConfig(new SkipListMemTableConfig()).
- setCompressionType(CompressionType.LZ4_COMPRESSION).
+ setCompressionType(compressionType).
setBottommostCompressionType(bottomMostCompressionType).
setNumLevels(7).
setCompactionStyle(CompactionStyle.UNIVERSAL).
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 3686bf2644..c5ecdefb52 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
@@ -52,7 +53,6 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -773,9 +773,9 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String
brokerAddr, String topic, long checkStoreTime)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
- return
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic);
+ return
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic,
checkStoreTime);
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 883dcbe41d..17f14f23af 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -90,7 +91,6 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1819,9 +1819,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String
brokerAddr, String topic, long checkStoreTime)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
- return
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
topic, timeoutMillis);
+ return
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
topic, checkStoreTime, timeoutMillis);
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 09204ab7be..aea43376ea 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
@@ -48,7 +49,6 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -149,7 +149,8 @@ public interface MQAdminExt extends MQAdmin {
final String consumerGroup) throws RemotingException,
MQClientException, InterruptedException,
MQBrokerException;
- CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String
brokerAddr, String topic) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
+ CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr,
String topic, long checkStoreTime)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException;
ConsumeStats examineConsumeStats(final String consumerGroup,
final String topic) throws RemotingException, MQClientException,
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
index d18a24ee1d..a0fc9fce1f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.tools.command.queue;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.remoting.RPCHook;
-import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -53,8 +54,11 @@ public class CheckRocksdbCqWriteProgressCommand implements
SubCommand {
options.addOption(opt);
opt = new Option("t", "topic", true, "topic name");
- opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("cf", "checkFrom", true, "check from time");
+ options.addOption(opt);
+
return options;
}
@@ -66,6 +70,10 @@ public class CheckRocksdbCqWriteProgressCommand implements
SubCommand {
defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n')));
String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : "";
String topic = commandLine.hasOption('t') ?
commandLine.getOptionValue('t').trim() : "";
+ // The default check is 30 days
+ long checkStoreTime = commandLine.hasOption("cf")
+ ? Long.parseLong(commandLine.getOptionValue("cf").trim())
+ : System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30L);
try {
defaultMQAdminExt.start();
@@ -80,14 +88,13 @@ public class CheckRocksdbCqWriteProgressCommand implements
SubCommand {
String brokerName = entry.getKey();
BrokerData brokerData = entry.getValue();
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
- CheckRocksdbCqWriteProgressResponseBody body =
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
- if (StringUtils.isNotBlank(topic)) {
- System.out.print(body.getDiffResult());
+ CheckRocksdbCqWriteResult result =
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic,
checkStoreTime);
+ if (result.getCheckStatus() ==
CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()) {
+ System.out.print(brokerName + " check error, please check
log... errInfo: " + result.getCheckResult());
} else {
- System.out.print(brokerName + " | " + brokerAddr + " | \n"
+ body.getDiffResult());
+ System.out.print(brokerName + " check doing, please wait
and get the result from log... \n");
}
}
-
} catch (Exception e) {
throw new RuntimeException(this.getClass().getSimpleName() + "
command failed", e);
} finally {