LetLetMe commented on code in PR #8600:
URL: https://github.com/apache/rocketmq/pull/8600#discussion_r1759648623
##########
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java:
##########
@@ -16,36 +16,79 @@
*/
package org.apache.rocketmq.broker.offset;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.WriteBatch;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-
public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
+ protected static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
protected RocksDBConfigManager rocksDBConfigManager;
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
- this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}
@Override
public boolean load() {
if (!rocksDBConfigManager.init()) {
return false;
}
- return this.rocksDBConfigManager.loadData(this::decodeOffset);
+ if (!loadDataVersion() || !loadConsumerOffset()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean loadConsumerOffset() {
+ return this.rocksDBConfigManager.loadData(this::decodeOffset) &&
merge();
+ }
+
+ private boolean merge() {
+ if
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
+ log.info("the switch transferOffsetJsonToRocksdb is off, no merge
offset operation is needed.");
+ return true;
+ }
+ if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+ log.info("consumerOffset json file does not exist, so skip merge");
+ return true;
+ }
+ if (!super.loadDataVersion()) {
+ log.error("load json consumerOffset dataVersion error, startup
will exit");
+ return false;
+ }
+
+ final DataVersion dataVersion = super.getDataVersion();
+ final DataVersion kvDataVersion = this.getDataVersion();
+ if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ if (!super.load()) {
+ log.error("load json consumerOffset info failed, startup will
exit");
+ return false;
+ }
+ this.persist();
+ this.getDataVersion().assignNewOne(dataVersion);
+ updateDataVersion();
Review Comment:
这里不原子应该也没问题,只要updateDataVersion在persist之后就可以,最差的情况是原模原样重新导入一次
It should be fine even if this is not atomic, as long as updateDataVersion
occurs after persist. In the worst-case scenario, it would just require
re-importing in the same state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]