This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 525f877f3b [ISSUE #8589] Support file format CQ and json format offset
in-place upgrade to rocksdb management (#8600)
525f877f3b is described below
commit 525f877f3bddced2d85d99520fd600bcbbfe3c6d
Author: LetLetMe <[email protected]>
AuthorDate: Mon Sep 23 19:24:15 2024 +0800
[ISSUE #8589] Support file format CQ and json format offset in-place
upgrade to rocksdb management (#8600)
---
.../apache/rocketmq/broker/BrokerController.java | 6 +-
.../broker/offset/ConsumerOffsetManager.java | 20 +++
.../offset/RocksDBConsumerOffsetManager.java | 77 +++++++++--
.../broker/processor/AdminBrokerProcessor.java | 92 +++++++++++-
.../RocksDBSubscriptionGroupManager.java | 36 ++---
.../subscription/SubscriptionGroupManager.java | 20 +++
.../broker/topic/RocksDBTopicConfigManager.java | 26 ++--
.../rocketmq/broker/topic/TopicConfigManager.java | 20 +++
.../offset/RocksdbTransferOffsetAndCqTest.java | 154 +++++++++++++++++++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 15 ++
.../common/config/AbstractRocksDBStorage.java | 23 +--
.../rocketmq/remoting/protocol/RequestCode.java | 1 +
.../CheckRocksdbCqWriteProgressResponseBody.java | 35 +++++
.../CheckRocksdbCqWriteProgressRequestHeader.java | 47 +++++++
.../apache/rocketmq/store/DefaultMessageStore.java | 42 +++++-
.../apache/rocketmq/store/RocksDBMessageStore.java | 44 +++++-
.../rocketmq/store/config/MessageStoreConfig.java | 31 +++++
.../org/apache/rocketmq/store/queue/CqUnit.java | 1 +
.../rocketmq/store/queue/RocksDBConsumeQueue.java | 3 +-
.../store/queue/RocksDBConsumeQueueStore.java | 10 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 7 +
.../tools/admin/DefaultMQAdminExtImpl.java | 7 +
.../apache/rocketmq/tools/admin/MQAdminExt.java | 3 +
.../export/ExportMetadataInRocksDBCommand.java | 4 +-
.../queue/CheckRocksdbCqWriteProgressCommand.java | 97 +++++++++++++
25 files changed, 755 insertions(+), 66 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 22ac7fedf1..aaf06caddf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.broker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
@@ -789,6 +788,9 @@ public class BrokerController {
defaultMessageStore = new
RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig,
topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig,
topicConfigManager.getTopicConfigTable());
+ if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+ defaultMessageStore.enableRocksdbCQWrite();
+ }
}
if (messageStoreConfig.isEnableDLegerCommitLog()) {
@@ -812,7 +814,7 @@ public class BrokerController {
this.timerMessageStore.registerEscapeBridgeHook(msg ->
escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
- } catch (IOException e) {
+ } catch (Exception e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs",
e);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 21f20dde32..403324137c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -31,6 +31,7 @@ import com.google.common.base.Strings;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -373,6 +374,25 @@ public class ConsumerOffsetManager extends ConfigManager {
this.dataVersion = dataVersion;
}
+ public boolean loadDataVersion() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath();
+ String jsonString = MixAll.file2String(fileName);
+ if (jsonString != null) {
+ ConsumerOffsetManager obj =
RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+ if (obj != null) {
+ this.dataVersion = obj.dataVersion;
+ }
+ LOG.info("load consumer offset dataVersion success,{},{} ",
fileName, jsonString);
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("load consumer offset dataVersion failed " + fileName,
e);
+ return false;
+ }
+ }
+
public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it =
this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
index de293fc499..1e7cda71ee 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
@@ -16,26 +16,31 @@
*/
package org.apache.rocketmq.broker.offset;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.WriteBatch;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-
public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
+ protected static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
protected RocksDBConfigManager rocksDBConfigManager;
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
- this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}
@Override
@@ -43,9 +48,47 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
if (!rocksDBConfigManager.init()) {
return false;
}
- return this.rocksDBConfigManager.loadData(this::decodeOffset);
+ if (!loadDataVersion() || !loadConsumerOffset()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean loadConsumerOffset() {
+ return this.rocksDBConfigManager.loadData(this::decodeOffset) &&
merge();
+ }
+
+ private boolean merge() {
+ if
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
+ log.info("the switch transferOffsetJsonToRocksdb is off, no merge
offset operation is needed.");
+ return true;
+ }
+ if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+ log.info("consumerOffset json file does not exist, so skip merge");
+ return true;
+ }
+ if (!super.loadDataVersion()) {
+ log.error("load json consumerOffset dataVersion error, startup
will exit");
+ return false;
+ }
+
+ final DataVersion dataVersion = super.getDataVersion();
+ final DataVersion kvDataVersion = this.getDataVersion();
+ if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ if (!super.load()) {
+ log.error("load json consumerOffset info failed, startup will
exit");
+ return false;
+ }
+ this.persist();
+ this.getDataVersion().assignNewOne(dataVersion);
+ updateDataVersion();
+ log.info("update offset from json, dataVersion:{}, offsetTable: {}
", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
+ }
+ return true;
}
+
@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
@@ -69,8 +112,7 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
LOG.info("load exist local offset, {}, {}", topicAtGroup,
wrapper.getOffsetTable());
}
- @Override
- public String configFilePath() {
+ public String rocksdbConfigFilePath() {
return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}
@@ -103,4 +145,23 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
byte[] valueBytes = JSON.toJSONBytes(wrapper,
SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}
+
+ @Override
+ public boolean loadDataVersion() {
+ return this.rocksDBConfigManager.loadDataVersion();
+ }
+
+ @Override
+ public DataVersion getDataVersion() {
+ return rocksDBConfigManager.getKvDataVersion();
+ }
+
+ public void updateDataVersion() {
+ try {
+ rocksDBConfigManager.updateKvDataVersion();
+ } catch (Exception e) {
+ log.error("update consumer offset dataVersion error", e);
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 28bd254914..863f16e515 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,9 +18,11 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
@@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
@@ -69,6 +70,7 @@ import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
@@ -137,6 +139,7 @@ import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerialize
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
@@ -209,6 +212,7 @@ import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -217,8 +221,9 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -339,6 +344,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
+ case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
+ return this.checkRocksdbCqWriteProgress(ctx, request);
case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
return this.updateAndGetGroupForbidden(ctx, request);
case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
@@ -458,6 +465,71 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
+ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext
ctx, RemotingCommand request) throws RemotingCommandException {
+ CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+ String requestTopic = requestHeader.getTopic();
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+
+ DefaultMessageStore messageStore = (DefaultMessageStore)
brokerController.getMessageStore();
+ RocksDBMessageStore rocksDBMessageStore =
messageStore.getRocksDBMessageStore();
+ if
(!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+ response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
"rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is
invalid")));
+ return response;
+ }
+
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
cqTable = messageStore.getConsumeQueueTable();
+ StringBuilder diffResult = new StringBuilder("check success, all is
ok!\n");
+ try {
+ if (StringUtils.isNotBlank(requestTopic)) {
+ processConsumeQueuesForTopic(cqTable.get(requestTopic),
requestTopic, rocksDBMessageStore, diffResult,false);
+
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
diffResult.toString())));
+ return response;
+ }
+ for (Map.Entry<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ processConsumeQueuesForTopic(topicEntry.getValue(), topic,
rocksDBMessageStore, diffResult,true);
+ }
+ diffResult.append("check all topic successful,
size:").append(cqTable.size());
+ response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
diffResult.toString())));
+
+ } catch (Exception e) {
+ LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+ response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult",
e.getMessage())));
+ }
+ return response;
+ }
+
+ private void processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
+ for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
+ Integer queueId = queueEntry.getKey();
+ ConsumeQueueInterface jsonCq = queueEntry.getValue();
+ ConsumeQueueInterface kvCq =
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+ if (!checkAll) {
+ String format = String.format("\n[topic: %s, queue: %s] \n
kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+ topic, queueId, kvCq.getEarliestUnit(),
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+ diffResult.append(format).append("\n");
+ }
+ long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
+ long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+ for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
+ Pair<CqUnit, Long> fileCqUnit =
jsonCq.getCqUnitAndStoreTime(i);
+ Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+ if (fileCqUnit == null || kvCqUnit == null) {
+ diffResult.append(String.format("[topic: %s, queue: %s,
offset: %s] \n kv : %s \n file: %s \n",
+ topic, queueId, i, kvCqUnit != null ?
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() :
"null"));
+ return;
+ }
+ if (!checkCqUnitEqual(kvCqUnit.getObject1(),
fileCqUnit.getObject1())) {
+ String diffInfo = String.format("[topic:%s, queue: %s
offset: %s] \n file: %s \n kv: %s",
+ topic, queueId, i, kvCqUnit.getObject1(),
fileCqUnit.getObject1());
+ LOGGER.error(diffInfo);
+ diffResult.append(diffInfo).append("\n");
+ return;
+ }
+ }
+ }
+ }
@Override
public boolean rejectRequest() {
return false;
@@ -3305,4 +3377,20 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
return false;
}
+
+ private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
+ if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
+ return false;
+ }
+ if (cqUnit1.getSize() != cqUnit2.getSize()) {
+ return false;
+ }
+ if (cqUnit1.getPos() != cqUnit2.getPos()) {
+ return false;
+ }
+ if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
+ return false;
+ }
+ return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
index 7df72dbe68..5119f78672 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
@@ -19,6 +19,12 @@ package org.apache.rocketmq.broker.subscription;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
@@ -27,13 +33,6 @@ import org.apache.rocketmq.remoting.protocol.DataVersion;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.RocksIterator;
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.BiConsumer;
-
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
protected RocksDBConfigManager rocksDBConfigManager;
@@ -79,28 +78,30 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
private boolean merge() {
if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
- log.info("The switch is off, no merge operation is needed.");
+ log.info("the switch transferMetadataJsonToRocksdb is off, no
merge subGroup operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
- log.info("json file and json back file not exist, so skip merge");
+ log.info("subGroup json file does not exist, so skip merge");
return true;
}
-
- if (!super.load()) {
- log.error("load group and forbidden info from json file error,
startup will exit");
+ if (!super.loadDataVersion()) {
+ log.error("load json subGroup dataVersion error, startup will
exit");
return false;
}
-
- final ConcurrentMap<String, SubscriptionGroupConfig> groupTable =
this.getSubscriptionGroupTable();
- final ConcurrentMap<String, ConcurrentMap<String, Integer>>
forbiddenTable = this.getForbiddenTable();
final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ if (!super.load()) {
+ log.error("load group and forbidden info from json file error,
startup will exit");
+ return false;
+ }
+ final ConcurrentMap<String, SubscriptionGroupConfig> groupTable =
this.getSubscriptionGroupTable();
for (Map.Entry<String, SubscriptionGroupConfig> entry :
groupTable.entrySet()) {
putSubscriptionGroupConfig(entry.getValue());
log.info("import subscription config to rocksdb, group={}",
entry.getValue());
}
+ final ConcurrentMap<String, ConcurrentMap<String, Integer>>
forbiddenTable = this.getForbiddenTable();
for (Map.Entry<String, ConcurrentMap<String, Integer>> entry :
forbiddenTable.entrySet()) {
try {
this.rocksDBConfigManager.updateForbidden(entry.getKey(),
JSON.toJSONString(entry.getValue()));
@@ -110,8 +111,10 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
return false;
}
}
-
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+ this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
+ } else {
+ log.info("dataVersion is not greater than kvDataVersion, no need
to merge group metaData, dataVersion={}, kvDataVersion={}", dataVersion,
kvDataVersion);
}
log.info("finish marge subscription config from json file and merge to
rocksdb");
this.persist();
@@ -196,6 +199,7 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
+ log.error("update group config dataVersion error", e);
throw new RuntimeException(e);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f2a7e0482b..e6855ef9a2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -334,6 +334,26 @@ public class SubscriptionGroupManager extends
ConfigManager {
return dataVersion;
}
+ public boolean loadDataVersion() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath();
+ String jsonString = MixAll.file2String(fileName);
+ if (jsonString != null) {
+ SubscriptionGroupManager obj =
RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+ if (obj != null) {
+ this.dataVersion.assignNewOne(obj.dataVersion);
+ this.printLoadDataWhenFirstBoot(obj);
+ log.info("load subGroup dataVersion success,{},{}",
fileName, obj.dataVersion);
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ log.error("load subGroup dataVersion failed" + fileName, e);
+ return false;
+ }
+ }
+
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName);
this.forbiddenTable.remove(groupName);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
index 2a89dd7e02..466e6416f9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
@@ -18,6 +18,9 @@ package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.TopicConfig;
@@ -25,10 +28,6 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
public class RocksDBTopicConfigManager extends TopicConfigManager {
protected RocksDBConfigManager rocksDBConfigManager;
@@ -60,29 +59,35 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
private boolean merge() {
if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
- log.info("The switch is off, no merge operation is needed.");
+ log.info("the switch transferMetadataJsonToRocksdb is off, no
merge topic operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
- log.info("json file and json back file not exist, so skip merge");
+ log.info("topic json file does not exist, so skip merge");
return true;
}
- if (!super.load()) {
- log.error("load topic config from json file error, startup will
exit");
+ if (!super.loadDataVersion()) {
+ log.error("load json topic dataVersion error, startup will exit");
return false;
}
- final ConcurrentMap<String, TopicConfig> topicConfigTable =
this.getTopicConfigTable();
final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ if (!super.load()) {
+ log.error("load topic config from json file error, startup
will exit");
+ return false;
+ }
+ final ConcurrentMap<String, TopicConfig> topicConfigTable =
this.getTopicConfigTable();
for (Map.Entry<String, TopicConfig> entry :
topicConfigTable.entrySet()) {
putTopicConfig(entry.getValue());
log.info("import topic config to rocksdb, topic={}",
entry.getValue());
}
-
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+ this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
+ } else {
+ log.info("dataVersion is not greater than kvDataVersion, no need
to merge topic metaData, dataVersion={}, kvDataVersion={}", dataVersion,
kvDataVersion);
}
log.info("finish read topic config from json file and merge to
rocksdb");
this.persist();
@@ -150,6 +155,7 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
+ log.error("update topic config dataVersion error", e);
throw new RuntimeException(e);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index eab2896b00..25d3218f2a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -637,6 +637,26 @@ public class TopicConfigManager extends ConfigManager {
return encode(false);
}
+ public boolean loadDataVersion() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath();
+ String jsonString = MixAll.file2String(fileName);
+ if (jsonString != null) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+ TopicConfigSerializeWrapper.fromJson(jsonString,
TopicConfigSerializeWrapper.class);
+ if (topicConfigSerializeWrapper != null) {
+
this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+ log.info("load topic metadata dataVersion success {}, {}",
fileName, topicConfigSerializeWrapper.getDataVersion());
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ log.error("load topic metadata dataVersion failed" + fileName, e);
+ return false;
+ }
+ }
+
@Override
public String configFilePath() {
return
BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
new file mode 100644
index 0000000000..b4800aec24
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.offset;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.RocksDBMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.RocksDBException;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbTransferOffsetAndCqTest {
+
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
+
+ private final String topic = "topic";
+ private final String group = "group";
+ private final String clientHost = "clientHost";
+ private final int queueId = 1;
+
+ private RocksDBConsumerOffsetManager rocksdbConsumerOffsetManager;
+
+ private ConsumerOffsetManager consumerOffsetManager;
+
+ private DefaultMessageStore defaultMessageStore;
+
+ @Mock
+ private BrokerController brokerController;
+
+ @Before
+ public void init() throws IOException {
+ if (notToBeExecuted()) {
+ return;
+ }
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setConsumerOffsetUpdateVersionStep(10);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
+ messageStoreConfig.setTransferOffsetJsonToRocksdb(true);
+ messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
+
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+ defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("aaa", true), null,
+ brokerConfig, new ConcurrentHashMap<String, TopicConfig>());
+ defaultMessageStore.enableRocksdbCQWrite();
+ defaultMessageStore.loadCheckPoint();
+
+ consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+ consumerOffsetManager.load();
+
+ rocksdbConsumerOffsetManager = new
RocksDBConsumerOffsetManager(brokerController);
+ }
+
+ @Test
+ public void testTransferOffset() {
+ if (notToBeExecuted()) {
+ return;
+ }
+
+ for (int i = 0; i < 200; i++) {
+ consumerOffsetManager.commitOffset(clientHost, group, topic,
queueId, i);
+ }
+
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable =
consumerOffsetManager.getOffsetTable();
+ ConcurrentMap<Integer, Long> map = offsetTable.get(topic + "@" +
group);
+ Assert.assertTrue(MapUtils.isNotEmpty(map));
+
+ Long offset = map.get(queueId);
+ Assert.assertEquals(199L, (long) offset);
+
+ long offsetDataVersion =
consumerOffsetManager.getDataVersion().getCounter().get();
+ Assert.assertEquals(20L, offsetDataVersion);
+
+ consumerOffsetManager.persist();
+
+ boolean loadResult = rocksdbConsumerOffsetManager.load();
+ Assert.assertTrue(loadResult);
+
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> rocksdbOffsetTable
= rocksdbConsumerOffsetManager.getOffsetTable();
+
+ ConcurrentMap<Integer, Long> rocksdbMap = rocksdbOffsetTable.get(topic
+ "@" + group);
+ Assert.assertTrue(MapUtils.isNotEmpty(rocksdbMap));
+
+ Long aLong1 = rocksdbMap.get(queueId);
+ Assert.assertEquals(199L, (long) aLong1);
+
+ long rocksdbOffset =
rocksdbConsumerOffsetManager.getDataVersion().getCounter().get();
+ Assert.assertEquals(21L, rocksdbOffset);
+ }
+
+ @Test
+ public void testRocksdbCqWrite() throws RocksDBException {
+ if (notToBeExecuted()) {
+ return;
+ }
+ RocksDBMessageStore kvStore =
defaultMessageStore.getRocksDBMessageStore();
+ ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
+ ConsumeQueueInterface rocksdbCq =
defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface fileCq =
defaultMessageStore.findConsumeQueue(topic, queueId);
+ for (int i = 0; i < 200; i++) {
+ DispatchRequest request = new DispatchRequest(topic, queueId, i,
200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>());
+ fileCq.putMessagePositionInfoWrapper(request);
+ store.putMessagePositionInfoWrapper(request);
+ }
+ Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
+ Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
+ Assert.assertTrue(unit.getObject1().getPos() ==
unit1.getObject1().getPos());
+ }
+
+ private boolean notToBeExecuted() {
+ return MixAll.isMac();
+ }
+
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b539b8f098..0a45f09623 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -113,6 +113,7 @@ import
org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
+import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@@ -148,6 +149,7 @@ import
org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -3017,6 +3019,19 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
throw new MQClientException(response.getCode(), response.getRemark());
}
+ public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final
long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
+ CheckRocksdbCqWriteProgressRequestHeader header = new
CheckRocksdbCqWriteProgressRequestHeader();
+ header.setTopic(topic);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
header);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
+ assert response != null;
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ return
CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(),
CheckRocksdbCqWriteProgressResponseBody.class);
+ }
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
public void checkClientInBroker(final String brokerAddr, final String
consumerGroup,
final String clientId, final SubscriptionData subscriptionData,
final long timeoutMillis)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index f88b8e198b..13522889bb 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.common.config;
import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
@@ -40,16 +49,6 @@ import org.rocksdb.Status;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import static org.rocksdb.RocksDB.NOT_FOUND;
public abstract class AbstractRocksDBStorage {
@@ -495,7 +494,9 @@ public abstract class AbstractRocksDBStorage {
String blocksPinnedByIteratorMemUsage =
this.db.getProperty("rocksdb.block-cache-pinned-usage");
logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {},
memtable: {}, blocksPinnedByIterator: {}",
blockCacheMemUsage, indexesAndFilterBlockMemUsage,
memTableMemUsage, blocksPinnedByIteratorMemUsage);
- } catch (Exception ignored) {
+ } catch (Exception e) {
+ logger.error("statRocksdb Failed. {}", this.dbPath, e);
+ throw new RuntimeException(e);
}
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index f45ff6fa48..cfc5cc2278 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -217,6 +217,7 @@ public class RequestCode {
public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352;
public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
+ public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
public static final int LITE_PULL_MESSAGE = 361;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
new file mode 100644
index 0000000000..76719ac1a2
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CheckRocksdbCqWriteProgressResponseBody extends
RemotingSerializable {
+
+ String diffResult;
+
+ public String getDiffResult() {
+ return diffResult;
+ }
+
+ public void setDiffResult(String diffResult) {
+ this.diffResult = diffResult;
+ }
+
+
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
new file mode 100644
index 0000000000..fee158b497
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.common.resource.ResourceType;
+import org.apache.rocketmq.common.resource.RocketMQResource;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+@RocketMQAction(value = RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, action =
Action.GET)
+public class CheckRocksdbCqWriteProgressRequestHeader implements
CommandCustomHeader {
+
+ @CFNotNull
+ @RocketMQResource(ResourceType.TOPIC)
+ private String topic;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8f564d5bc1..8b46c7f5ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -163,11 +163,13 @@ public class DefaultMessageStore implements MessageStore {
private volatile boolean shutdown = true;
protected boolean notifyMessageArriveInBatch = false;
- private StoreCheckpoint storeCheckpoint;
+ protected StoreCheckpoint storeCheckpoint;
private TimerMessageStore timerMessageStore;
private final LinkedList<CommitLogDispatcher> dispatcherList;
+ private RocksDBMessageStore rocksDBMessageStore;
+
private RandomAccessFile lockFile;
private FileLock lock;
@@ -354,12 +356,7 @@ public class DefaultMessageStore implements MessageStore {
}
if (result) {
- this.storeCheckpoint =
- new StoreCheckpoint(
-
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
- this.masterFlushedOffset =
this.storeCheckpoint.getMasterFlushedOffset();
- setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
-
+ loadCheckPoint();
result = this.indexService.load(lastExitOK);
this.recover(lastExitOK);
LOGGER.info("message store recover end, and the max phy offset
= {}", this.getMaxPhyOffset());
@@ -381,6 +378,14 @@ public class DefaultMessageStore implements MessageStore {
return result;
}
+ public void loadCheckPoint() throws IOException {
+ this.storeCheckpoint =
+ new StoreCheckpoint(
+
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+ this.masterFlushedOffset =
this.storeCheckpoint.getMasterFlushedOffset();
+ setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
+ }
+
/**
* @throws Exception
*/
@@ -511,6 +516,10 @@ public class DefaultMessageStore implements MessageStore {
this.compactionService.shutdown();
}
+ if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+ this.rocksDBMessageStore.consumeQueueStore.shutdown();
+ }
+
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
@@ -3251,6 +3260,17 @@ public class DefaultMessageStore implements MessageStore
{
}
}
+ public void enableRocksdbCQWrite() {
+ try {
+ RocksDBMessageStore store = new
RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig, this.topicConfigTable);
+ this.rocksDBMessageStore = store;
+ store.loadAndStartConsumerServiceOnly();
+ addDispatcher(store.getDispatcherBuildRocksdbConsumeQueue());
+ } catch (Exception e) {
+ LOGGER.error("enableRocksdbCqWrite error", e);
+ }
+ }
+
public int getMaxDelayLevel() {
return maxDelayLevel;
}
@@ -3338,4 +3358,12 @@ public class DefaultMessageStore implements MessageStore
{
public long getReputFromOffset() {
return this.reputMessageService.getReputFromOffset();
}
+
+ public RocksDBMessageStore getRocksDBMessageStore() {
+ return this.rocksDBMessageStore;
+ }
+
+ public ConsumeQueueStoreInterface getConsumeQueueStore() {
+ return consumeQueueStore;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 6141b778bf..90df7aed59 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -16,16 +16,16 @@
*/
package org.apache.rocketmq.store;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
-
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
@@ -39,6 +39,8 @@ import org.rocksdb.RocksDBException;
public class RocksDBMessageStore extends DefaultMessageStore {
+ private CommitLogDispatcherBuildRocksdbConsumeQueue
dispatcherBuildRocksdbConsumeQueue;
+
public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig>
topicConfigTable) throws
IOException {
@@ -178,4 +180,40 @@ public class RocksDBMessageStore extends
DefaultMessageStore {
// Also add some metrics for rocksdb's monitoring.
RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
}
+
+ public CommitLogDispatcherBuildRocksdbConsumeQueue
getDispatcherBuildRocksdbConsumeQueue() {
+ return dispatcherBuildRocksdbConsumeQueue;
+ }
+
+ class CommitLogDispatcherBuildRocksdbConsumeQueue implements
CommitLogDispatcher {
+ @Override
+ public void dispatch(DispatchRequest request) throws RocksDBException {
+ final int tranType =
MessageSysFlag.getTransactionValue(request.getSysFlag());
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ putMessagePositionInfo(request);
+ break;
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ }
+ }
+ }
+
+ public void loadAndStartConsumerServiceOnly() {
+ try {
+ this.dispatcherBuildRocksdbConsumeQueue = new
CommitLogDispatcherBuildRocksdbConsumeQueue();
+ boolean loadResult = this.consumeQueueStore.load();
+ if (!loadResult) {
+ throw new RuntimeException("load consume queue failed");
+ }
+ super.loadCheckPoint();
+ this.consumeQueueStore.start();
+ } catch (Exception e) {
+ ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e);
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 0b45d92418..c077831f3c 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -424,6 +424,37 @@ public class MessageStoreConfig {
private boolean putConsumeQueueDataByFileChannel = true;
+ private boolean transferOffsetJsonToRocksdb = false;
+
+ private boolean rocksdbCQDoubleWriteEnable = false;
+
+ private boolean enableBatchWriteKvCq = true;
+
+
+ public boolean isEnableBatchWriteKvCq() {
+ return enableBatchWriteKvCq;
+ }
+
+ public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
+ this.enableBatchWriteKvCq = enableBatchWriteKvCq;
+ }
+
+ public boolean isRocksdbCQDoubleWriteEnable() {
+ return rocksdbCQDoubleWriteEnable;
+ }
+
+ public void setRocksdbCQDoubleWriteEnable(boolean rocksdbWriteEnable) {
+ this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
+ }
+
+ public boolean isTransferOffsetJsonToRocksdb() {
+ return transferOffsetJsonToRocksdb;
+ }
+
+ public void setTransferOffsetJsonToRocksdb(boolean
transferOffsetJsonToRocksdb) {
+ this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb;
+ }
+
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
index b8865fd919..34f5cb142b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
@@ -109,6 +109,7 @@ public class CqUnit {
", size=" + size +
", pos=" + pos +
", batchNum=" + batchNum +
+ ", tagsCode=" + tagsCode +
", compactedOffset=" + compactedOffset +
'}';
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 5a981bb4df..2363c2896e 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store.queue;
import java.nio.ByteBuffer;
import java.util.List;
-
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
@@ -311,7 +310,7 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
public CqUnit getLatestUnit() {
try {
long maxOffset =
this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
- return get(maxOffset);
+ return get(maxOffset > 0 ? maxOffset - 1 : maxOffset);
} catch (RocksDBException e) {
ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {},
{}", topic, queueId, e.getMessage());
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 3c6b91ec01..34c6d2f395 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
@@ -78,6 +77,8 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>>
tempTopicQueueMaxOffsetMap;
private volatile boolean isCQError = false;
+ private boolean enableBatchWriteKvCq;
+
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);
@@ -87,6 +88,7 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
this.rocksDBConsumeQueueOffsetTable = new
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage,
messageStore);
this.writeBatch = new WriteBatch();
+ this.enableBatchWriteKvCq =
messageStoreConfig.isEnableBatchWriteKvCq();
this.bufferDRList = new ArrayList(BATCH_SIZE);
this.cqBBPairList = new ArrayList(BATCH_SIZE);
this.offsetBBPairList = new ArrayList(BATCH_SIZE);
@@ -164,12 +166,12 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws
RocksDBException {
- if (request == null || this.bufferDRList.size() >= BATCH_SIZE) {
- putMessagePosition();
- }
if (request != null) {
this.bufferDRList.add(request);
}
+ if (request == null || !enableBatchWriteKvCq ||
this.bufferDRList.size() >= BATCH_SIZE) {
+ putMessagePosition();
+ }
}
public void putMessagePosition() throws RocksDBException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6ebee1d0dd..3686bf2644 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -52,6 +52,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -771,6 +772,12 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
);
}
+ @Override
+ public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic);
+ }
+
@Override
public boolean resumeCheckHalfMessage(String topic,
String msgId)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index dc4d35e704..883dcbe41d 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -90,6 +90,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1817,6 +1818,12 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
return
this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic,
queueId, index, count, consumerGroup, timeoutMillis);
}
+ @Override
+ public CheckRocksdbCqWriteProgressResponseBody
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
topic, timeoutMillis);
+ }
+
@Override
public boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ff78f22c70..09204ab7be 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,6 +48,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -148,6 +149,8 @@ public interface MQAdminExt extends MQAdmin {
final String consumerGroup) throws RemotingException,
MQClientException, InterruptedException,
MQBrokerException;
+ CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String
brokerAddr, String topic) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
+
ConsumeStats examineConsumeStats(final String consumerGroup,
final String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
index 1ecb1fa2cd..c466490b8a 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.tools.command.export;
import com.alibaba.fastjson.JSONObject;
@@ -77,6 +78,7 @@ public class ExportMetadataInRocksDBCommand implements
SubCommand {
}
String configType =
commandLine.getOptionValue("configType").trim().toLowerCase();
+ path += "/" + configType;
boolean jsonEnable = false;
if (commandLine.hasOption("jsonEnable")) {
@@ -86,7 +88,7 @@ public class ExportMetadataInRocksDBCommand implements
SubCommand {
ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /*
readOnly */);
if (!kvStore.start()) {
- System.out.print("RocksDB load error, path=" + path + "\n");
+ System.out.printf("RocksDB load error, path=%s\n" , path);
return;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
new file mode 100644
index 0000000000..82dcb74196
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class CheckRocksdbCqWriteProgressCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "checkRocksdbCqWriteProgressCommandCommand";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "check if rocksdb cq is same as file cq";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "cluster", true, "cluster name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("n", "nameserverAddr", true, "nameserverAddr");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n')));
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : "";
+ String topic = commandLine.hasOption('t') ?
commandLine.getOptionValue('t').trim() : "";
+
+ try {
+ defaultMQAdminExt.start();
+ ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
+ Map<String, Set<String>> clusterAddrTable =
clusterInfo.getClusterAddrTable();
+ Map<String, BrokerData> brokerAddrTable =
clusterInfo.getBrokerAddrTable();
+ if (clusterAddrTable.get(clusterName) == null) {
+ System.out.print("clusterAddrTable is empty");
+ return;
+ }
+ for (Map.Entry<String, BrokerData> entry :
brokerAddrTable.entrySet()) {
+ String brokerName = entry.getKey();
+ BrokerData brokerData = entry.getValue();
+ String brokerAddr = brokerData.getBrokerAddrs().get(0L);
+ CheckRocksdbCqWriteProgressResponseBody body =
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
+ if (StringUtils.isNotBlank(topic)) {
+ System.out.printf(body.getDiffResult());
+ } else {
+ System.out.printf(brokerName + " | " + brokerAddr + " | "
+ body.getDiffResult());
+ }
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}