lizhanhui commented on code in PR #8600:
URL: https://github.com/apache/rocketmq/pull/8600#discussion_r1759650333


##########
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java:
##########
@@ -16,36 +16,79 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import java.io.File;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.rocksdb.WriteBatch;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-
 public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
 
+    protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
     protected RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
-        this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
     public boolean load() {
         if (!rocksDBConfigManager.init()) {
             return false;
         }
-        return this.rocksDBConfigManager.loadData(this::decodeOffset);
+        if (!loadDataVersion() || !loadConsumerOffset()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public boolean loadConsumerOffset() {
+        return this.rocksDBConfigManager.loadData(this::decodeOffset) && 
merge();
+    }
+
+    private boolean merge() {
+        if 
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
+            log.info("the switch transferOffsetJsonToRocksdb is off, no merge 
offset operation is needed.");
+            return true;
+        }
+        if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+            log.info("consumerOffset json file does not exist, so skip merge");
+            return true;
+        }
+        if (!super.loadDataVersion()) {
+            log.error("load json consumerOffset dataVersion error, startup 
will exit");
+            return false;
+        }
+
+        final DataVersion dataVersion = super.getDataVersion();
+        final DataVersion kvDataVersion = this.getDataVersion();
+        if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            if (!super.load()) {
+                log.error("load json consumerOffset info failed, startup will 
exit");
+                return false;
+            }
+            this.persist();
+            this.getDataVersion().assignNewOne(dataVersion);
+            updateDataVersion();

Review Comment:
   I said not mandatory, but keeping data and version consistent is really good 
to have, especially when you are trouble shooting bugs online.
   
   IMO, it will be a minor change to achieve this goal. Why not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to