LetLetMe commented on code in PR #8600:
URL: https://github.com/apache/rocketmq/pull/8600#discussion_r1759648623


##########
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java:
##########
@@ -16,36 +16,79 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import java.io.File;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.rocksdb.WriteBatch;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-
 public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
 
+    protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
     protected RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
-        this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
     public boolean load() {
         if (!rocksDBConfigManager.init()) {
             return false;
         }
-        return this.rocksDBConfigManager.loadData(this::decodeOffset);
+        if (!loadDataVersion() || !loadConsumerOffset()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public boolean loadConsumerOffset() {
+        return this.rocksDBConfigManager.loadData(this::decodeOffset) && 
merge();
+    }
+
+    private boolean merge() {
+        if 
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
+            log.info("the switch transferOffsetJsonToRocksdb is off, no merge 
offset operation is needed.");
+            return true;
+        }
+        if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+            log.info("consumerOffset json file does not exist, so skip merge");
+            return true;
+        }
+        if (!super.loadDataVersion()) {
+            log.error("load json consumerOffset dataVersion error, startup 
will exit");
+            return false;
+        }
+
+        final DataVersion dataVersion = super.getDataVersion();
+        final DataVersion kvDataVersion = this.getDataVersion();
+        if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            if (!super.load()) {
+                log.error("load json consumerOffset info failed, startup will 
exit");
+                return false;
+            }
+            this.persist();
+            this.getDataVersion().assignNewOne(dataVersion);
+            updateDataVersion();

Review Comment:
   这里不原子应该也没问题,只要updateDataVersion在persist之后就可以,最差的情况是原模原样重新导入一次
   It should be fine even if this is not atomic, as long as updateDataVersion 
occurs after persist. In the worst-case scenario, it would just require 
re-importing in the same state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to