lollipopjin commented on code in PR #8600:
URL: https://github.com/apache/rocketmq/pull/8600#discussion_r1737994434


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -339,6 +343,8 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx,
                 return fetchAllConsumeStatsInBroker(ctx, request);
             case RequestCode.QUERY_CONSUME_QUEUE:
                 return queryConsumeQueue(ctx, request);
+            case RequestCode.DIFF_CONSUME_QUEUE:

Review Comment:
   How about change code to DIFF_CONSUME_QUEUE_FOR_ROCKSDB, means this admin 
tool is used to diff consumerQueue Content in file and rocksDB



##########
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java:
##########
@@ -103,4 +147,22 @@ private void putWriteBatch(final WriteBatch writeBatch, 
final String topicGroupN
         byte[] valueBytes = JSON.toJSONBytes(wrapper, 
SerializerFeature.BrowserCompatible);
         writeBatch.put(keyBytes, valueBytes);
     }
+
+    @Override
+    public boolean loadDataVersion() {
+        return this.rocksDBConfigManager.loadDataVersion();
+    }
+
+    @Override
+    public DataVersion getDataVersion() {
+        return rocksDBConfigManager.getKvDataVersion();
+    }
+
+    public void updateDataVersion() {
+        try {
+            rocksDBConfigManager.updateKvDataVersion();
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   Add log here.



##########
store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java:
##########
@@ -178,4 +180,30 @@ public void initMetrics(Meter meter, 
Supplier<AttributesBuilder> attributesBuild
         // Also add some metrics for rocksdb's monitoring.
         RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
     }
+
+    public CommitLogDispatcherBuildRocksdbConsumeQueue 
getDispatcherBuildRocksdbConsumeQueue() {
+        return dispatcherBuildRocksdbConsumeQueue;
+    }
+
+    class CommitLogDispatcherBuildRocksdbConsumeQueue implements 
CommitLogDispatcher {
+        @Override
+        public void dispatch(DispatchRequest request) throws RocksDBException {
+            putMessagePositionInfo(request);
+        }
+    }
+
+    public void loadAndStartConsumerServiceOnly() {
+        try {
+            this.dispatcherBuildRocksdbConsumeQueue = new 
CommitLogDispatcherBuildRocksdbConsumeQueue();
+            boolean loadResult = this.consumeQueueStore.load();
+            if (!loadResult) {
+                throw new RuntimeException("load consume queue failed");
+            }
+            super.loadCheckPoint();
+            this.consumeQueueStore.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   Should add log here.



##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -458,6 +464,79 @@ private RemotingCommand 
updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
         return response;
     }
 
+    private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, 
RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "diff 
success, very good!")));
+
+        DefaultMessageStore messageStore = (DefaultMessageStore) 
brokerController.getMessageStore();
+        RocksDBMessageStore rocksDBMessageStore = 
messageStore.getRocksDBMessageStore();
+
+        if (!messageStore.getMessageStoreConfig().isRocksdbCQWriteEnable()) {
+            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
"RocksdbCQWriteEnable is false, diffConsumeQueue is invalid")));
+            return response;
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = messageStore.getConsumeQueueTable();
+        Random random = new Random();
+        for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
topicToCqListEntry : cqTable.entrySet()) {
+            String topic = topicToCqListEntry.getKey();
+            ConcurrentMap<Integer, ConsumeQueueInterface> queueIdToCqMap = 
topicToCqListEntry.getValue();
+            for (Map.Entry<Integer, ConsumeQueueInterface> queueIdToCqEntry : 
queueIdToCqMap.entrySet()) {
+                Integer queueId = queueIdToCqEntry.getKey();
+
+                ConsumeQueueInterface jsonCq = queueIdToCqEntry.getValue();
+                ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+
+                CqUnit kvCqEarliestUnit = kvCq.getEarliestUnit();
+                CqUnit kvCqLatestUnit = kvCq.getLatestUnit();
+                CqUnit jsonCqEarliestUnit = jsonCq.getEarliestUnit();
+                CqUnit jsonCqLatestUnit = jsonCq.getLatestUnit();
+                LOGGER.info("diffConsumeQueue topic:{}, queue:{}, 
kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, 
jsonCqLatestUnit:{}",
+                    topic, queueId, kvCqEarliestUnit, jsonCqEarliestUnit, 
kvCqLatestUnit, jsonCqLatestUnit);
+
+                long jsonOffset = jsonCq.getMaxOffsetInQueue() - 1;
+                int sampleCount = 10;
+
+                Set<Long> sampledOffsets = new HashSet<>();
+
+                if (jsonOffset > 100) {

Review Comment:
   if jsonCq.getMaxOffsetInQueue() here is 100, meantime 
jsonCq.getMinOffsetInQueue() is 100 too, you may not find 10 messages here.



##########
common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java:
##########
@@ -496,6 +496,7 @@ public void statRocksdb(Logger logger) {
             logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, 
memtable: {}, blocksPinnedByIterator: {}",
                     blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
         } catch (Exception ignored) {
+            throw new RuntimeException(ignored);

Review Comment:
   Change ignored to e 



##########
tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class DiffConsumeQueueCommand implements SubCommand {
+
+    public static void main(String[] args) {
+        DiffConsumeQueueCommand cmd = new DiffConsumeQueueCommand();
+
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-c LetLetMe", "-n xxxx:9876"};

Review Comment:
   Means DefaultCluster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to