This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2a938fb1bc [ISSUE #8058] Support for upgrading metadata in json to 
rocksdb (#8571)
2a938fb1bc is described below

commit 2a938fb1bcf35f18b2bcbf8616043be6fb105a36
Author: LetLetMe <[email protected]>
AuthorDate: Fri Aug 23 11:18:03 2024 +0800

    [ISSUE #8058] Support for upgrading metadata in json to rocksdb (#8571)
---
 .github/workflows/maven.yaml                       |  27 +-
 .../rocketmq/broker}/RocksDBConfigManager.java     |  60 +++-
 .../offset/RocksDBConsumerOffsetManager.java       |  14 +-
 .../RocksDBSubscriptionGroupManager.java           | 156 +++++++++-
 .../subscription/SubscriptionGroupManager.java     |  29 +-
 .../broker/topic/RocksDBTopicConfigManager.java    |  91 +++++-
 .../rocketmq/broker/topic/TopicConfigManager.java  |  52 +---
 .../RocksdbGroupConfigTransferTest.java            | 340 +++++++++++++++++++++
 .../subscription/SubscriptionGroupManagerTest.java |  32 +-
 .../topic/RocksdbTopicConfigManagerTest.java       |  15 +-
 .../topic/RocksdbTopicConfigTransferTest.java      | 259 ++++++++++++++++
 .../broker/topic/TopicConfigManagerTest.java       |  10 +-
 .../org/apache/rocketmq/common/ConfigManager.java  |  12 -
 .../common/config/AbstractRocksDBStorage.java      |  21 +-
 .../common/config/ConfigRocksDBStorage.java        |  39 ++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  12 +
 .../rocketmq/tools/command/MQAdminStartup.java     |   2 +
 .../metadata/RocksDBConfigToJsonCommand.java       |  80 +++--
 18 files changed, 1069 insertions(+), 182 deletions(-)

diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 449f637894..a49201b8a1 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -29,19 +29,28 @@ jobs:
           cache: "maven"
       - name: Build with Maven
         run: mvn -B package --file pom.xml
-      - name: Upload JVM crash logs
+
+      - name: Run tests with increased memory and debug info
+        run: mvn test -X -Dparallel=none -DargLine="-Xmx1024m 
-XX:MaxPermSize=256m"
+
+      - name: Upload Auth JVM crash logs
         if: failure()
         uses: actions/upload-artifact@v4
         with:
           name: jvm-crash-logs
           path: /Users/runner/work/rocketmq/rocketmq/auth/hs_err_pid*.log
           retention-days: 1
-      - name: Retry if failed
-        # if it failed , retry 2 times at most
-        if: failure() && fromJSON(github.run_attempt) < 3
-        env:
-          GH_REPO: ${{ github.repository }}
-          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+      - name: Check for broker JVM crash logs
+        if: failure()
         run: |
-          echo "Attempting to retry workflow..."
-          gh workflow run rerun-workflow.yml -F run_id=${{ github.run_id }}
\ No newline at end of file
+          echo "Checking for JVM crash logs..."
+          ls -al /Users/runner/work/rocketmq/rocketmq/broker/
+
+      - name: Upload broker JVM crash logs
+        if: failure()
+        uses: actions/upload-artifact@v4
+        with:
+          name: jvm-crash-logs
+          path: /Users/runner/work/rocketmq/rocketmq/broker/hs_err_pid*.log
+          retention-days: 1
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
 b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
similarity index 63%
rename from 
common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index d1ec894685..20358c4707 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -14,46 +14,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.common.config;
+package org.apache.rocketmq.broker;
 
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
+import java.nio.charset.StandardCharsets;
 import java.util.function.BiConsumer;
 
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    protected volatile boolean isStop = false;
-    protected ConfigRocksDBStorage configRocksDBStorage = null;
+    public volatile boolean isStop = false;
+    public ConfigRocksDBStorage configRocksDBStorage = null;
     private FlushOptions flushOptions = null;
     private volatile long lastFlushMemTableMicroSecond = 0;
+
+    private final String filePath;
     private final long memTableFlushInterval;
+    private DataVersion kvDataVersion = new DataVersion();
+
 
-    public RocksDBConfigManager(long memTableFlushInterval) {
+    public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
+        this.filePath = filePath;
         this.memTableFlushInterval = memTableFlushInterval;
     }
 
-    public boolean load(String configFilePath, BiConsumer<byte[], byte[]> 
biConsumer) {
+    public boolean init() {
         this.isStop = false;
-        this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
-        if (!this.configRocksDBStorage.start()) {
-            return false;
-        }
-        RocksIterator iterator = this.configRocksDBStorage.iterator();
+        this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
+        return this.configRocksDBStorage.start();
+    }
+    public boolean loadDataVersion() {
+        String currDataVersionString = null;
         try {
+            byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
+            if (dataVersion != null && dataVersion.length > 0) {
+                currDataVersionString = new String(dataVersion, 
StandardCharsets.UTF_8);
+            }
+            kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? 
JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
+            return true;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
+        try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
             iterator.seekToFirst();
             while (iterator.isValid()) {
                 biConsumer.accept(iterator.key(), iterator.value());
                 iterator.next();
             }
-        } finally {
-            iterator.close();
         }
 
         this.flushOptions = new FlushOptions();
@@ -103,6 +123,20 @@ public class RocksDBConfigManager {
         this.configRocksDBStorage.delete(keyBytes);
     }
 
+    public void updateKvDataVersion() throws Exception {
+        kvDataVersion.nextVersion();
+        
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
+    }
+
+    public DataVersion getKvDataVersion() {
+        return kvDataVersion;
+    }
+
+    public void updateForbidden(String key, String value) throws Exception {
+        
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), 
value.getBytes(StandardCharsets.UTF_8));
+    }
+
+
     public void batchPutWithWal(final WriteBatch batch) throws Exception {
         this.configRocksDBStorage.batchPutWithWal(batch);
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
index 05b53b0bcf..de293fc499 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
@@ -22,7 +22,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.rocksdb.WriteBatch;
 
@@ -31,14 +31,19 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 
 public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
 
+    protected RocksDBConfigManager rocksDBConfigManager;
+
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
     public boolean load() {
-        return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
+        if (!rocksDBConfigManager.init()) {
+            return false;
+        }
+        return this.rocksDBConfigManager.loadData(this::decodeOffset);
     }
 
     @Override
@@ -56,8 +61,7 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
         }
     }
 
-    @Override
-    protected void decode0(final byte[] key, final byte[] body) {
+    protected void decodeOffset(final byte[] key, final byte[] body) {
         String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
         RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, 
RocksDBOffsetSerializeWrapper.class);
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
index e9a81a8d68..7df72dbe68 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
@@ -16,39 +16,116 @@
  */
 package org.apache.rocketmq.broker.subscription;
 
-import java.io.File;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.RocksIterator;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
 
 public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
 
+    protected RocksDBConfigManager rocksDBConfigManager;
+
     public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
         super(brokerController, false);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
     public boolean load() {
-        if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
+        if (!rocksDBConfigManager.init()) {
+            return false;
+        }
+        if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
             return false;
         }
         this.init();
         return true;
     }
 
+    public boolean loadDataVersion() {
+        return this.rocksDBConfigManager.loadDataVersion();
+    }
+
+    public boolean loadSubscriptionGroupAndForbidden() {
+        return 
this.rocksDBConfigManager.loadData(this::decodeSubscriptionGroup)
+                && this.loadForbidden(this::decodeForbidden)
+                && merge();
+    }
+
+    public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
+        try (RocksIterator iterator = 
this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
+            iterator.seekToFirst();
+            while (iterator.isValid()) {
+                biConsumer.accept(iterator.key(), iterator.value());
+                iterator.next();
+            }
+        }
+        return true;
+    }
+
+
+    private boolean merge() {
+        if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
+            log.info("The switch is off, no merge operation is needed.");
+            return true;
+        }
+        if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+            log.info("json file and json back file not exist, so skip merge");
+            return true;
+        }
+
+        if (!super.load()) {
+            log.error("load group and forbidden info from json file error, 
startup will exit");
+            return false;
+        }
+
+        final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = 
this.getSubscriptionGroupTable();
+        final ConcurrentMap<String, ConcurrentMap<String, Integer>> 
forbiddenTable = this.getForbiddenTable();
+        final DataVersion dataVersion = super.getDataVersion();
+        final DataVersion kvDataVersion = this.getDataVersion();
+        if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            for (Map.Entry<String, SubscriptionGroupConfig> entry : 
groupTable.entrySet()) {
+                putSubscriptionGroupConfig(entry.getValue());
+                log.info("import subscription config to rocksdb, group={}", 
entry.getValue());
+            }
+            for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : 
forbiddenTable.entrySet()) {
+                try {
+                    this.rocksDBConfigManager.updateForbidden(entry.getKey(), 
JSON.toJSONString(entry.getValue()));
+                    log.info("import forbidden config to rocksdb, group={}", 
entry.getValue());
+                } catch (Exception e) {
+                    log.error("import forbidden config to rocksdb failed, 
group={}", entry.getValue());
+                    return false;
+                }
+            }
+            
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+            updateDataVersion();
+        }
+        log.info("finish marge subscription config from json file and merge to 
rocksdb");
+        this.persist();
+
+        return true;
+    }
+
     @Override
     public boolean stop() {
         return this.rocksDBConfigManager.stop();
     }
 
     @Override
-    protected SubscriptionGroupConfig 
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+    public SubscriptionGroupConfig 
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
         String groupName = subscriptionGroupConfig.getGroupName();
         SubscriptionGroupConfig oldConfig = 
this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
 
@@ -89,8 +166,8 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
         return subscriptionGroupConfig;
     }
 
-    @Override
-    protected void decode0(byte[] key, byte[] body) {
+
+    protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
         String groupName = new String(key, DataConverter.CHARSET_UTF8);
         SubscriptionGroupConfig subscriptionGroupConfig = 
JSON.parseObject(body, SubscriptionGroupConfig.class);
 
@@ -105,8 +182,63 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
         }
     }
 
-    @Override
-    public String configFilePath() {
+    public String rocksdbConfigFilePath() {
         return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "subscriptionGroups" + 
File.separator;
     }
+
+    @Override
+    public DataVersion getDataVersion() {
+        return rocksDBConfigManager.getKvDataVersion();
+    }
+
+    @Override
+    public void updateDataVersion() {
+        try {
+            rocksDBConfigManager.updateKvDataVersion();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected void decodeForbidden(byte[] key, byte[] body) {
+        String forbiddenGroupName = new String(key, 
DataConverter.CHARSET_UTF8);
+        JSONObject jsonObject = JSON.parseObject(new String(body, 
DataConverter.CHARSET_UTF8));
+        Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
+        ConcurrentMap<String, Integer> forbiddenGroup = new 
ConcurrentHashMap<>(entries.size());
+        for (Map.Entry<String, Object> entry : entries) {
+            forbiddenGroup.put(entry.getKey(), (Integer) entry.getValue());
+        }
+        this.getForbiddenTable().put(forbiddenGroupName, forbiddenGroup);
+        log.info("load forbidden,{} value {}", forbiddenGroupName, 
forbiddenGroup.toString());
+    }
+
+    @Override
+    public void updateForbidden(String group, String topic, int 
forbiddenIndex, boolean setOrClear) {
+        try {
+            super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
+            this.rocksDBConfigManager.updateForbidden(group, 
JSON.toJSONString(this.getForbiddenTable().get(group)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setForbidden(String group, String topic, int forbiddenIndex) {
+        try {
+            super.setForbidden(group, topic, forbiddenIndex);
+            this.rocksDBConfigManager.updateForbidden(group, 
JSON.toJSONString(this.getForbiddenTable().get(group)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void clearForbidden(String group, String topic, int forbiddenIndex) 
{
+        try {
+            super.clearForbidden(group, topic, forbiddenIndex);
+            this.rocksDBConfigManager.updateForbidden(group, 
JSON.toJSONString(this.getForbiddenTable().get(group)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e63b930586..1d9614fe58 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -121,7 +121,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         }
     }
 
-    protected SubscriptionGroupConfig 
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+    public SubscriptionGroupConfig 
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
         return 
this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), 
subscriptionGroupConfig);
     }
 
@@ -156,8 +156,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
             log.info("create new subscription group, {}", config);
         }
 
-        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
-        dataVersion.nextVersion(stateMachineVersion);
+        updateDataVersion();
 
         this.persist();
     }
@@ -214,7 +213,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         return topicForbidden;
     }
 
-    private void updateForbiddenValue(String group, String topic, Integer 
forbidden) {
+    protected void updateForbiddenValue(String group, String topic, Integer 
forbidden) {
         if (forbidden == null || forbidden <= 0) {
             this.forbiddenTable.remove(group);
             log.info("clear group forbidden, {}@{} ", group, topic);
@@ -233,8 +232,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
             log.info("set group forbidden, {}@{} old: {} new: {}", group, 
topic, 0, forbidden);
         }
 
-        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
-        dataVersion.nextVersion(stateMachineVersion);
+        updateDataVersion();
 
         this.persist();
     }
@@ -243,8 +241,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         SubscriptionGroupConfig old = getSubscriptionGroupConfig(groupName);
         if (old != null) {
             old.setConsumeEnable(false);
-            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
         }
     }
 
@@ -261,8 +258,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
                 if (null == preConfig) {
                     log.info("auto create a subscription group, {}", 
subscriptionGroupConfig.toString());
                 }
-                long stateMachineVersion = brokerController.getMessageStore() 
!= null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-                dataVersion.nextVersion(stateMachineVersion);
+                updateDataVersion();
                 this.persist();
             }
         }
@@ -331,8 +327,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         this.forbiddenTable.remove(groupName);
         if (old != null) {
             log.info("delete subscription group OK, subscription group:{}", 
old);
-            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
             this.persist();
         } else {
             log.warn("delete subscription group failed, subscription 
groupName: {} not exist", groupName);
@@ -369,4 +364,14 @@ public class SubscriptionGroupManager extends 
ConfigManager {
             }
         }
     }
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion.assignNewOne(dataVersion);
+    }
+
+    public void updateDataVersion() {
+        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+        dataVersion.nextVersion(stateMachineVersion);
+    }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
index fddecf2d92..2a89dd7e02 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
@@ -16,39 +16,86 @@
  */
 package org.apache.rocketmq.broker.topic;
 
-import java.io.File;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 public class RocksDBTopicConfigManager extends TopicConfigManager {
 
+    protected RocksDBConfigManager rocksDBConfigManager;
+
     public RocksDBTopicConfigManager(BrokerController brokerController) {
         super(brokerController, false);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
     public boolean load() {
-        if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
+        if (!rocksDBConfigManager.init()) {
+            return false;
+        }
+        if (!loadDataVersion() || !loadTopicConfig()) {
             return false;
         }
         this.init();
         return true;
     }
 
+    public boolean loadTopicConfig() {
+        return this.rocksDBConfigManager.loadData(this::decodeTopicConfig) && 
merge();
+    }
+
+    public boolean loadDataVersion() {
+        return this.rocksDBConfigManager.loadDataVersion();
+    }
+
+    private boolean merge() {
+        if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
+            log.info("The switch is off, no merge operation is needed.");
+            return true;
+        }
+        if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+            log.info("json file and json back file not exist, so skip merge");
+            return true;
+        }
+
+        if (!super.load()) {
+            log.error("load topic config from json file error, startup will 
exit");
+            return false;
+        }
+
+        final ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.getTopicConfigTable();
+        final DataVersion dataVersion = super.getDataVersion();
+        final DataVersion kvDataVersion = this.getDataVersion();
+        if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            for (Map.Entry<String, TopicConfig> entry : 
topicConfigTable.entrySet()) {
+                putTopicConfig(entry.getValue());
+                log.info("import topic config to rocksdb, topic={}", 
entry.getValue());
+            }
+            
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+            updateDataVersion();
+        }
+        log.info("finish read topic config from json file and merge to 
rocksdb");
+        this.persist();
+        return true;
+    }
+
+
     @Override
     public boolean stop() {
         return this.rocksDBConfigManager.stop();
     }
 
-    @Override
-    protected void decode0(byte[] key, byte[] body) {
+    protected void decodeTopicConfig(byte[] key, byte[] body) {
         String topicName = new String(key, DataConverter.CHARSET_UTF8);
         TopicConfig topicConfig = JSON.parseObject(body, TopicConfig.class);
 
@@ -57,12 +104,7 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
     }
 
     @Override
-    public String configFilePath() {
-        return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "topics" + File.separator;
-    }
-
-    @Override
-    protected TopicConfig putTopicConfig(TopicConfig topicConfig) {
+    public TopicConfig putTopicConfig(TopicConfig topicConfig) {
         String topicName = topicConfig.getTopicName();
         TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, 
topicConfig);
         try {
@@ -92,4 +134,23 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
             this.rocksDBConfigManager.flushWAL();
         }
     }
+
+    public String rocksdbConfigFilePath() {
+        return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "topics" + File.separator;
+    }
+
+
+    @Override
+    public DataVersion getDataVersion() {
+        return rocksDBConfigManager.getKvDataVersion();
+    }
+
+    @Override
+    public void updateDataVersion() {
+        try {
+            rocksDBConfigManager.updateKvDataVersion();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index c71c65fe8b..eab2896b00 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -225,7 +225,7 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    protected TopicConfig putTopicConfig(TopicConfig topicConfig) {
+    public TopicConfig putTopicConfig(TopicConfig topicConfig) {
         return this.topicConfigTable.put(topicConfig.getTopicName(), 
topicConfig);
     }
 
@@ -293,8 +293,7 @@ public class TopicConfigManager extends ConfigManager {
 
                         putTopicConfig(topicConfig);
 
-                        long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
-                        dataVersion.nextVersion(stateMachineVersion);
+                        updateDataVersion();
 
                         createNew = true;
 
@@ -337,8 +336,7 @@ public class TopicConfigManager extends ConfigManager {
                     }
                     log.info("Create new topic [{}] config:[{}]", 
topicConfig.getTopicName(), topicConfig);
                     putTopicConfig(topicConfig);
-                    long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
-                    dataVersion.nextVersion(stateMachineVersion);
+                    updateDataVersion();
                     createNew = true;
                     this.persist();
                 } finally {
@@ -397,8 +395,7 @@ public class TopicConfigManager extends ConfigManager {
                     log.info("create new topic {}", topicConfig);
                     putTopicConfig(topicConfig);
                     createNew = true;
-                    long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
-                    dataVersion.nextVersion(stateMachineVersion);
+                    updateDataVersion();
                     this.persist();
                 } finally {
                     this.topicConfigTableLock.unlock();
@@ -438,8 +435,7 @@ public class TopicConfigManager extends ConfigManager {
                     log.info("create new topic {}", topicConfig);
                     putTopicConfig(topicConfig);
                     createNew = true;
-                    long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
-                    dataVersion.nextVersion(stateMachineVersion);
+                    updateDataVersion();
                     this.persist();
                 } finally {
                     this.topicConfigTableLock.unlock();
@@ -472,8 +468,7 @@ public class TopicConfigManager extends ConfigManager {
 
             putTopicConfig(topicConfig);
 
-            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
 
             this.persist();
             registerBrokerData(topicConfig);
@@ -495,8 +490,7 @@ public class TopicConfigManager extends ConfigManager {
 
             putTopicConfig(topicConfig);
 
-            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
 
             this.persist();
             registerBrokerData(topicConfig);
@@ -509,7 +503,6 @@ public class TopicConfigManager extends ConfigManager {
         Map<String, String> newAttributes = request(topicConfig);
         Map<String, String> currentAttributes = 
current(topicConfig.getTopicName());
 
-
         Map<String, String> finalAttributes = 
AttributeUtil.alterCurrentAttributes(
             this.topicConfigTable.get(topicConfig.getTopicName()) == null,
             TopicAttributes.ALL,
@@ -526,8 +519,7 @@ public class TopicConfigManager extends ConfigManager {
             log.info("create new topic [{}]", topicConfig);
         }
 
-        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
-        dataVersion.nextVersion(stateMachineVersion);
+        updateDataVersion();
     }
 
     public void updateTopicConfig(final TopicConfig topicConfig) {
@@ -581,25 +573,8 @@ public class TopicConfigManager extends ConfigManager {
                 }
             }
 
-            // We don't have a mandatory rule to maintain the validity of 
order conf in NameServer,
-            // so we may overwrite the order field mistakenly.
-            // To avoid the above case, we comment the below codes, please use 
mqadmin API to update
-            // the order filed.
-            /*for (Map.Entry<String, TopicConfig> entry : 
this.topicConfigTable.entrySet()) {
-                String topic = entry.getKey();
-                if (!orderTopics.contains(topic)) {
-                    TopicConfig topicConfig = entry.getValue();
-                    if (topicConfig.isOrder()) {
-                        topicConfig.setOrder(false);
-                        isChange = true;
-                        log.info("update order topic config, topic={}, 
order={}", topic, false);
-                    }
-                }
-            }*/
-
             if (isChange) {
-                long stateMachineVersion = brokerController.getMessageStore() 
!= null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-                dataVersion.nextVersion(stateMachineVersion);
+                updateDataVersion();
                 this.persist();
             }
         }
@@ -623,8 +598,7 @@ public class TopicConfigManager extends ConfigManager {
         TopicConfig old = removeTopicConfig(topic);
         if (old != null) {
             log.info("delete topic config OK, topic: {}", old);
-            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
             this.persist();
         } else {
             log.warn("delete topic config failed, topic: {} not exists", 
topic);
@@ -739,5 +713,11 @@ public class TopicConfigManager extends ConfigManager {
         return topicConfigTable.containsKey(topic);
     }
 
+    public void updateDataVersion() {
+        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+        dataVersion.nextVersion(stateMachineVersion);
+    }
+
+
 
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
new file mode 100644
index 0000000000..205e642843
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.subscription;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbGroupConfigTransferTest {
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+            "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
+
+    private RocksDBSubscriptionGroupManager rocksDBSubscriptionGroupManager;
+
+    private SubscriptionGroupManager jsonSubscriptionGroupManager;
+    @Mock
+    private BrokerController brokerController;
+
+    @Mock
+    private DefaultMessageStore defaultMessageStore;
+
+    @Before
+    public void init() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        BrokerConfig brokerConfig = new BrokerConfig();
+        
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
+        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
+        
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+        when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+    }
+
+    @After
+    public void destroy() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        Path pathToBeDeleted = Paths.get(basePath);
+
+        try {
+            Files.walk(pathToBeDeleted)
+                    .sorted(Comparator.reverseOrder())
+                    .forEach(path -> {
+                        try {
+                            Files.delete(path);
+                        } catch (IOException e) {
+                            // ignore
+                        }
+                    });
+        } catch (IOException e) {
+            // ignore
+        }
+        if (rocksDBSubscriptionGroupManager != null) {
+            rocksDBSubscriptionGroupManager.stop();
+        }
+    }
+
+
+    public void initRocksDBSubscriptionGroupManager() {
+        if (rocksDBSubscriptionGroupManager == null) {
+            rocksDBSubscriptionGroupManager = new 
RocksDBSubscriptionGroupManager(brokerController);
+            rocksDBSubscriptionGroupManager.load();
+        }
+    }
+
+    public void initJsonSubscriptionGroupManager() {
+        if (jsonSubscriptionGroupManager == null) {
+            jsonSubscriptionGroupManager = new 
SubscriptionGroupManager(brokerController);
+            jsonSubscriptionGroupManager.load();
+        }
+    }
+
+    @Test
+    public void theFirstTimeLoadJsonSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initJsonSubscriptionGroupManager();
+        DataVersion dataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(0L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+    }
+
+    @Test
+    public void theFirstTimeLoadRocksDBSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initRocksDBSubscriptionGroupManager();
+        DataVersion dataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(0L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+    }
+
+
+    @Test
+    public void addGroupLoadJsonSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initJsonSubscriptionGroupManager();
+        int beforeSize = 
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size();
+        String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(groupName);
+        subscriptionGroupConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        
jsonSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+        int afterSize = 
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size();
+        DataVersion afterDataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+
+        Assert.assertEquals(0, beforeDataVersionCounter);
+        Assert.assertEquals(1, afterDataVersionCounter);
+        Assert.assertEquals(1, afterSize - beforeSize);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+    }
+
+    @Test
+    public void addForbiddenGroupLoadJsonSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initJsonSubscriptionGroupManager();
+        int beforeSize = 
jsonSubscriptionGroupManager.getForbiddenTable().size();
+        String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(groupName);
+        subscriptionGroupConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        jsonSubscriptionGroupManager.setForbidden(groupName, "topic", 0);
+        int afterSize = 
jsonSubscriptionGroupManager.getForbiddenTable().size();
+        DataVersion afterDataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+
+        Assert.assertEquals(1, afterDataVersionCounter - 
beforeDataVersionCounter);
+        Assert.assertEquals(1, afterSize - beforeSize);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+    }
+
+    @Test
+    public void addGroupLoadRocksdbSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initRocksDBSubscriptionGroupManager();
+        int beforeSize = 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size();
+        String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(groupName);
+        subscriptionGroupConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        
rocksDBSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+        int afterSize = 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size();
+        DataVersion afterDataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+        Assert.assertEquals(1, afterDataVersionCounter);
+        Assert.assertEquals(0, beforeDataVersionCounter);
+        Assert.assertEquals(1, afterSize - beforeSize);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+    }
+
+    @Test
+    public void addForbiddenLoadRocksdbSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initRocksDBSubscriptionGroupManager();
+        int beforeSize = 
rocksDBSubscriptionGroupManager.getForbiddenTable().size();
+        String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(groupName);
+        subscriptionGroupConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        rocksDBSubscriptionGroupManager.updateForbidden(groupName, "topic", 0, 
true);
+
+        int afterSize = 
rocksDBSubscriptionGroupManager.getForbiddenTable().size();
+        DataVersion afterDataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+        Assert.assertEquals(1, afterDataVersionCounter - 
beforeDataVersionCounter);
+        Assert.assertEquals(1, afterSize - beforeSize);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+    }
+
+    @Test
+    public void theSecondTimeLoadJsonSubscriptionGroupManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addGroupLoadJsonSubscriptionGroupManager();
+        jsonSubscriptionGroupManager.stop();
+        rocksDBSubscriptionGroupManager = null;
+        addForbiddenGroupLoadJsonSubscriptionGroupManager();
+        jsonSubscriptionGroupManager.stop();
+        rocksDBSubscriptionGroupManager = null;
+        jsonSubscriptionGroupManager = new 
SubscriptionGroupManager(brokerController);
+        jsonSubscriptionGroupManager.load();
+        DataVersion dataVersion = 
jsonSubscriptionGroupManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(2L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        Assert.assertNotEquals(0, 
jsonSubscriptionGroupManager.getForbiddenTable().size());
+        Assert.assertNotEquals(0, 
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+    }
+
+    @Test
+    public void theSecondTimeLoadRocksdbTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addGroupLoadRocksdbSubscriptionGroupManager();
+        rocksDBSubscriptionGroupManager.stop();
+        rocksDBSubscriptionGroupManager = null;
+        addForbiddenLoadRocksdbSubscriptionGroupManager();
+        rocksDBSubscriptionGroupManager.stop();
+        rocksDBSubscriptionGroupManager = null;
+        rocksDBSubscriptionGroupManager = new 
RocksDBSubscriptionGroupManager(brokerController);
+        rocksDBSubscriptionGroupManager.load();
+        DataVersion dataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(2L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+    }
+
+
+    @Test
+    public void jsonUpgradeToRocksdb() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addGroupLoadJsonSubscriptionGroupManager();
+        addForbiddenGroupLoadJsonSubscriptionGroupManager();
+        initRocksDBSubscriptionGroupManager();
+        DataVersion dataVersion = 
rocksDBSubscriptionGroupManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(3L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        
Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(),
 jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        
Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(), 
jsonSubscriptionGroupManager.getForbiddenTable().size());
+
+        rocksDBSubscriptionGroupManager.stop();
+        rocksDBSubscriptionGroupManager = new 
RocksDBSubscriptionGroupManager(brokerController);
+        rocksDBSubscriptionGroupManager.load();
+        dataVersion = rocksDBSubscriptionGroupManager.getDataVersion();
+        Assert.assertEquals(3L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+        Assert.assertNotEquals(0, 
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        
Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(),
 jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+        
Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(), 
jsonSubscriptionGroupManager.getForbiddenTable().size());
+    }
+
+    private boolean notToBeExecuted() {
+        return MixAll.isMac();
+    }
+
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3c829437cf..3ed4ac11a4 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -18,7 +18,11 @@
 package org.apache.rocketmq.broker.subscription;
 
 import com.google.common.collect.ImmutableMap;
+
+import java.nio.file.Paths;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.SubscriptionGroupAttributes;
@@ -30,36 +34,42 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SubscriptionGroupManagerTest {
     private String group = "group";
+
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+            "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
     @Mock
     private BrokerController brokerControllerMock;
     private SubscriptionGroupManager subscriptionGroupManager;
 
     @Before
     public void before() {
+        if (notToBeExecuted()) {
+            return;
+        }
         SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
             "test",
             false,
             false
         ));
         subscriptionGroupManager = spy(new 
SubscriptionGroupManager(brokerControllerMock));
-        when(brokerControllerMock.getMessageStore()).thenReturn(null);
-        doNothing().when(subscriptionGroupManager).persist();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
+        
Mockito.lenient().when(brokerControllerMock.getMessageStoreConfig()).thenReturn(messageStoreConfig);
     }
 
     @After
     public void destroy() {
-        if (MixAll.isMac()) {
+        if (notToBeExecuted()) {
             return;
         }
         if (subscriptionGroupManager != null) {
@@ -69,18 +79,18 @@ public class SubscriptionGroupManagerTest {
 
     @Test
     public void testUpdateAndCreateSubscriptionGroupInRocksdb() {
-        if (MixAll.isMac()) {
+        if (notToBeExecuted()) {
             return;
         }
-        when(brokerControllerMock.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
-        subscriptionGroupManager = spy(new 
RocksDBSubscriptionGroupManager(brokerControllerMock));
-        subscriptionGroupManager.load();
         group += System.currentTimeMillis();
         updateSubscriptionGroupConfig();
     }
 
     @Test
     public void updateSubscriptionGroupConfig() {
+        if (notToBeExecuted()) {
+            return;
+        }
         SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
         subscriptionGroupConfig.setGroupName(group);
         Map<String, String> attr = ImmutableMap.of("+test", "true");
@@ -99,4 +109,8 @@ public class SubscriptionGroupManagerTest {
         assertThatThrownBy(() -> 
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1))
             .isInstanceOf(RuntimeException.class).hasMessage("attempt to 
update an unchangeable attribute. key: test");
     }
+
+    private boolean notToBeExecuted() {
+        return MixAll.isMac();
+    }
 }
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index ed71a3313a..b0e0d05736 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.rocketmq.broker.topic;
 
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -39,6 +42,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static com.google.common.collect.Sets.newHashSet;
@@ -47,6 +51,10 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class RocksdbTopicConfigManagerTest {
+
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+            "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
+
     private RocksDBTopicConfigManager topicConfigManager;
     @Mock
     private BrokerController brokerController;
@@ -62,9 +70,11 @@ public class RocksdbTopicConfigManagerTest {
         BrokerConfig brokerConfig = new BrokerConfig();
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
+        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
         
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-        
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
-        when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+        
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+        
Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
         topicConfigManager = new RocksDBTopicConfigManager(brokerController);
         topicConfigManager.load();
     }
@@ -197,7 +207,6 @@ public class RocksdbTopicConfigManagerTest {
         TopicConfig existingTopicConfig = 
topicConfigManager.getTopicConfigTable().get(topic);
         Assert.assertEquals("enum-2", 
existingTopicConfig.getAttributes().get("enum.key"));
         Assert.assertEquals("16", 
existingTopicConfig.getAttributes().get("long.range.key"));
-        //        assert file
     }
 
     @Test
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
new file mode 100644
index 0000000000..2a72709098
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbTopicConfigTransferTest {
+
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+            "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
+
+    private RocksDBTopicConfigManager rocksdbTopicConfigManager;
+
+    private TopicConfigManager jsonTopicConfigManager;
+    @Mock
+    private BrokerController brokerController;
+
+    @Mock
+    private DefaultMessageStore defaultMessageStore;
+
+    @Before
+    public void init() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        BrokerConfig brokerConfig = new BrokerConfig();
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
+        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
+        
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+        when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+    }
+
+    @After
+    public void destroy() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        Path pathToBeDeleted = Paths.get(basePath);
+        try {
+            Files.walk(pathToBeDeleted)
+                    .sorted(Comparator.reverseOrder())
+                    .forEach(path -> {
+                        try {
+                            Files.delete(path);
+                        } catch (IOException e) {
+                            // ignore
+                        }
+                    });
+        } catch (IOException e) {
+            // ignore
+        }
+        if (rocksdbTopicConfigManager != null) {
+            rocksdbTopicConfigManager.stop();
+        }
+    }
+
+    public void initRocksdbTopicConfigManager() {
+        if (rocksdbTopicConfigManager == null) {
+            rocksdbTopicConfigManager = new 
RocksDBTopicConfigManager(brokerController);
+            rocksdbTopicConfigManager.load();
+        }
+    }
+
+    public void initJsonTopicConfigManager() {
+        if (jsonTopicConfigManager == null) {
+            jsonTopicConfigManager = new TopicConfigManager(brokerController);
+            jsonTopicConfigManager.load();
+        }
+    }
+
+    @Test
+    public void theFirstTimeLoadJsonTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initJsonTopicConfigManager();
+        DataVersion dataVersion = jsonTopicConfigManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(0L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
jsonTopicConfigManager.getTopicConfigTable().size());
+    }
+
+    @Test
+    public void theFirstTimeLoadRocksdbTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initRocksdbTopicConfigManager();
+        DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(0L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksdbTopicConfigManager.getTopicConfigTable().size());
+    }
+
+
+    @Test
+    public void addTopicLoadJsonTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initJsonTopicConfigManager();
+        String topicName = "testAddTopicConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topicName);
+        topicConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
jsonTopicConfigManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        jsonTopicConfigManager.updateTopicConfig(topicConfig);
+
+        DataVersion afterDataVersion = jsonTopicConfigManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+
+        Assert.assertEquals(0, beforeDataVersionCounter);
+        Assert.assertEquals(1, afterDataVersionCounter);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+    }
+
+    @Test
+    public void addTopicLoadRocksdbTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        initRocksdbTopicConfigManager();
+        String topicName = "testAddTopicConfig-" + System.currentTimeMillis();
+
+        Map<String, String> attributes = new HashMap<>();
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topicName);
+        topicConfig.setAttributes(attributes);
+        DataVersion beforeDataVersion = 
rocksdbTopicConfigManager.getDataVersion();
+        long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+        long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+        rocksdbTopicConfigManager.updateTopicConfig(topicConfig);
+
+        DataVersion afterDataVersion = 
rocksdbTopicConfigManager.getDataVersion();
+        long afterDataVersionCounter = afterDataVersion.getCounter().get();
+        long afterTimestamp = afterDataVersion.getTimestamp();
+        Assert.assertEquals(0, beforeDataVersionCounter);
+        Assert.assertEquals(1, afterDataVersionCounter);
+        Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+    }
+
+    @Test
+    public void theSecondTimeLoadJsonTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addTopicLoadJsonTopicConfigManager();
+        jsonTopicConfigManager.stop();
+        jsonTopicConfigManager = new TopicConfigManager(brokerController);
+        jsonTopicConfigManager.load();
+        DataVersion dataVersion = jsonTopicConfigManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(1L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
jsonTopicConfigManager.getTopicConfigTable().size());
+    }
+
+    @Test
+    public void theSecondTimeLoadRocksdbTopicConfigManager() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addTopicLoadRocksdbTopicConfigManager();
+        rocksdbTopicConfigManager.stop();
+        rocksdbTopicConfigManager = null;
+        rocksdbTopicConfigManager = new 
RocksDBTopicConfigManager(brokerController);
+        rocksdbTopicConfigManager.load();
+        DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(1L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksdbTopicConfigManager.getTopicConfigTable().size());
+    }
+
+    @Test
+    public void jsonUpgradeToRocksdb() {
+        if (notToBeExecuted()) {
+            return;
+        }
+        addTopicLoadJsonTopicConfigManager();
+        initRocksdbTopicConfigManager();
+        DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+        Assert.assertNotNull(dataVersion);
+        Assert.assertEquals(2L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksdbTopicConfigManager.getTopicConfigTable().size());
+        
Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(), 
jsonTopicConfigManager.getTopicConfigTable().size());
+
+        rocksdbTopicConfigManager.stop();
+        rocksdbTopicConfigManager = new 
RocksDBTopicConfigManager(brokerController);
+        rocksdbTopicConfigManager.load();
+        dataVersion = rocksdbTopicConfigManager.getDataVersion();
+        Assert.assertEquals(2L, dataVersion.getCounter().get());
+        Assert.assertEquals(0L, dataVersion.getStateVersion());
+        Assert.assertNotEquals(0, 
rocksdbTopicConfigManager.getTopicConfigTable().size());
+        
Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(), 
rocksdbTopicConfigManager.getTopicConfigTable().size());
+    }
+
+
+    private boolean notToBeExecuted() {
+        return MixAll.isMac();
+    }
+
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 6052a79d41..3fd1d14c3a 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.rocketmq.broker.topic;
 
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicAttributes;
@@ -37,6 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static com.google.common.collect.Sets.newHashSet;
@@ -45,6 +49,9 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TopicConfigManagerTest {
+
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+            "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
     private TopicConfigManager topicConfigManager;
     @Mock
     private BrokerController brokerController;
@@ -57,8 +64,9 @@ public class TopicConfigManagerTest {
         BrokerConfig brokerConfig = new BrokerConfig();
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
         
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-        
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+        
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
         topicConfigManager = new TopicConfigManager(brokerController);
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java 
b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index 099f0d8d56..3fcf466fd7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.common;
 
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.rocksdb.Statistics;
 
 import java.io.IOException;
 import java.util.Map;
@@ -28,8 +26,6 @@ import java.util.Map;
 public abstract class ConfigManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
-    protected RocksDBConfigManager rocksDBConfigManager;
-
     public boolean load() {
         String fileName = null;
         try {
@@ -89,10 +85,6 @@ public abstract class ConfigManager {
         }
     }
 
-    protected void decode0(final byte[] key, final byte[] body) {
-
-    }
-
     public boolean stop() {
         return true;
     }
@@ -104,8 +96,4 @@ public abstract class ConfigManager {
     public abstract String encode(final boolean prettyFormat);
 
     public abstract void decode(final String jsonString);
-
-    public Statistics getStatistics() {
-        return rocksDBConfigManager == null ? null : 
rocksDBConfigManager.getStatistics();
-    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index ed3a12dc24..f88b8e198b 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -16,18 +16,7 @@
  */
 package org.apache.rocketmq.common.config;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Maps;
-
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
@@ -51,6 +40,16 @@ import org.rocksdb.Status;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import static org.rocksdb.RocksDB.NOT_FOUND;
 
 public abstract class AbstractRocksDBStorage {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index b40f8046e8..f657d9cf2d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.common.config;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -54,6 +55,15 @@ import org.rocksdb.util.SizeUnit;
 
 public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
 
+    private static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = 
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] KV_DATA_VERSION_KEY = 
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
+    protected ColumnFamilyHandle kvDataVersionFamilyHandle;
+
+    private static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = 
"forbidden".getBytes(StandardCharsets.UTF_8);
+    protected ColumnFamilyHandle forbiddenFamilyHandle;
+
+
+
     public ConfigRocksDBStorage(final String dbPath) {
         super();
         this.dbPath = dbPath;
@@ -115,11 +125,15 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
             ColumnFamilyOptions defaultOptions = createConfigOptions();
             this.cfOptions.add(defaultOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
-
+            cfDescriptors.add(new 
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
+            cfDescriptors.add(new 
ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
             final List<ColumnFamilyHandle> cfHandles = new ArrayList();
             open(cfDescriptors, cfHandles);
 
             this.defaultCFHandle = cfHandles.get(0);
+            this.kvDataVersionFamilyHandle = cfHandles.get(1);
+            this.forbiddenFamilyHandle = cfHandles.get(2);
+
         } catch (final Exception e) {
             AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", 
this.dbPath, e);
             return false;
@@ -129,7 +143,8 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
 
     @Override
     protected void preShutdown() {
-
+        this.kvDataVersionFamilyHandle.close();
+        this.forbiddenFamilyHandle.close();
     }
 
     private ColumnFamilyOptions createConfigOptions() {
@@ -225,6 +240,22 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
         return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes);
     }
 
+    public void updateKvDataVersion(final byte[] valueBytes) throws Exception {
+        put(this.kvDataVersionFamilyHandle, this.ableWalWriteOptions, 
KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, valueBytes, valueBytes.length);
+    }
+
+    public byte[] getKvDataVersion() throws Exception {
+        return get(this.kvDataVersionFamilyHandle, this.totalOrderReadOptions, 
KV_DATA_VERSION_KEY);
+    }
+
+    public void updateForbidden(final byte[] keyBytes, final byte[] 
valueBytes) throws Exception {
+        put(this.forbiddenFamilyHandle, this.ableWalWriteOptions, keyBytes, 
keyBytes.length, valueBytes, valueBytes.length);
+    }
+
+    public byte[] getForbidden(final byte[] keyBytes) throws Exception {
+        return get(this.forbiddenFamilyHandle, this.totalOrderReadOptions, 
keyBytes);
+    }
+
     public void delete(final byte[] keyBytes) throws Exception {
         delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes);
     }
@@ -246,6 +277,10 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
         return this.db.newIterator(this.defaultCFHandle, 
this.totalOrderReadOptions);
     }
 
+    public RocksIterator forbiddenIterator() {
+        return this.db.newIterator(this.forbiddenFamilyHandle, 
this.totalOrderReadOptions);
+    }
+
     public void rangeDelete(final byte[] startKey, final byte[] endKey) throws 
RocksDBException {
         rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey);
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 5b2a1931b3..0b45d92418 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -105,6 +105,9 @@ public class MessageStoreConfig {
     // default, defaultRocksDB
     @ImportantField
     private String storeType = StoreType.DEFAULT.getStoreType();
+
+    private boolean transferMetadataJsonToRocksdb = false;
+
     // ConsumeQueue file size,default is 30W
     private int mappedFileSizeConsumeQueue = 300000 * 
ConsumeQueue.CQ_STORE_UNIT_SIZE;
     // enable consume queue ext
@@ -1842,4 +1845,13 @@ public class MessageStoreConfig {
     public void setPutConsumeQueueDataByFileChannel(boolean 
putConsumeQueueDataByFileChannel) {
         this.putConsumeQueueDataByFileChannel = 
putConsumeQueueDataByFileChannel;
     }
+
+    public boolean isTransferMetadataJsonToRocksdb() {
+        return transferMetadataJsonToRocksdb;
+    }
+
+    public void setTransferMetadataJsonToRocksdb(boolean 
transferMetadataJsonToRocksdb) {
+        this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
+    }
+
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index e785934ba3..d56ed05326 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -92,6 +92,7 @@ import 
org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
 import org.apache.rocketmq.tools.command.message.SendMessageCommand;
+import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
 import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
 import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -300,6 +301,7 @@ public class MQAdminStartup {
         initCommand(new GetAclSubCommand());
         initCommand(new ListAclSubCommand());
         initCommand(new CopyAclsSubCommand());
+        initCommand(new RocksDBConfigToJsonCommand());
     }
 
     private static void printHelp() {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index b987ad873b..1d81287ac7 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.tools.command.metadata;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
+import org.rocksdb.RocksIterator;
 
 import java.io.File;
 import java.util.HashMap;
@@ -47,8 +50,8 @@ public class RocksDBConfigToJsonCommand implements SubCommand 
{
 
     @Override
     public Options buildCommandlineOptions(Options options) {
-        Option pathOption = new Option("p", "path", true,
-                "Absolute path to the metadata directory");
+        Option pathOption = new Option("p", "configPath", true,
+                "Absolute path to the metadata config directory");
         pathOption.setRequired(true);
         options.addOption(pathOption);
 
@@ -62,57 +65,50 @@ public class RocksDBConfigToJsonCommand implements 
SubCommand {
 
     @Override
     public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
-        String path = commandLine.getOptionValue("path").trim();
+        String path = commandLine.getOptionValue("configPath").trim();
         if (StringUtils.isEmpty(path) || !new File(path).exists()) {
             System.out.print("Rocksdb path is invalid.\n");
             return;
         }
 
         String configType = 
commandLine.getOptionValue("configType").trim().toLowerCase();
+        if (!path.endsWith("/")) {
+            path += "/";
+        }
+        path += configType;
+
+        ConfigRocksDBStorage configRocksDBStorage = new 
ConfigRocksDBStorage(path, true);
+        configRocksDBStorage.start();
+        RocksIterator iterator = configRocksDBStorage.iterator();
 
-        final long memTableFlushInterval = 60 * 60 * 1000L;
-        RocksDBConfigManager kvConfigManager = new 
RocksDBConfigManager(memTableFlushInterval);
         try {
-            if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
-                // for topics.json
-                final Map<String, JSONObject> topicsJsonConfig = new 
HashMap<>();
-                final Map<String, JSONObject> topicConfigTable = new 
HashMap<>();
-                boolean isLoad = kvConfigManager.load(path, (key, value) -> {
-                    final String topic = new String(key, 
DataConverter.CHARSET_UTF8);
-                    final String topicConfig = new String(value, 
DataConverter.CHARSET_UTF8);
-                    final JSONObject jsonObject = 
JSONObject.parseObject(topicConfig);
-                    topicConfigTable.put(topic, jsonObject);
-                });
+            final Map<String, JSONObject> configMap = new HashMap<>();
+            final Map<String, JSONObject> configTable = new HashMap<>();
+            iterator.seekToFirst();
+            while (iterator.isValid()) {
+                final byte[] key = iterator.key();
+                final byte[] value = iterator.value();
+                final String name = new String(key, 
DataConverter.CHARSET_UTF8);
+                final String config = new String(value, 
DataConverter.CHARSET_UTF8);
+                final JSONObject jsonObject = JSONObject.parseObject(config);
+                configTable.put(name, jsonObject);
+                iterator.next();
+            }
+            byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
+            configMap.put("dataVersion",
+                    JSONObject.parseObject(new String(kvDataVersion, 
DataConverter.CHARSET_UTF8)));
 
-                if (isLoad) {
-                    topicsJsonConfig.put("topicConfigTable", (JSONObject) 
JSONObject.toJSON(topicConfigTable));
-                    final String topicsJsonStr = 
JSONObject.toJSONString(topicsJsonConfig, true);
-                    System.out.print(topicsJsonStr + "\n");
-                    return;
-                }
+            if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
+                configMap.put("topicConfigTable", 
JSON.parseObject(JSONObject.toJSONString(configTable)));
             }
             if 
(SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
-                // for subscriptionGroup.json
-                final Map<String, JSONObject> subscriptionGroupJsonConfig = 
new HashMap<>();
-                final Map<String, JSONObject> subscriptionGroupTable = new 
HashMap<>();
-                boolean isLoad = kvConfigManager.load(path, (key, value) -> {
-                    final String subscriptionGroup = new String(key, 
DataConverter.CHARSET_UTF8);
-                    final String subscriptionGroupConfig = new String(value, 
DataConverter.CHARSET_UTF8);
-                    final JSONObject jsonObject = 
JSONObject.parseObject(subscriptionGroupConfig);
-                    subscriptionGroupTable.put(subscriptionGroup, jsonObject);
-                });
-
-                if (isLoad) {
-                    subscriptionGroupJsonConfig.put("subscriptionGroupTable",
-                            (JSONObject) 
JSONObject.toJSON(subscriptionGroupTable));
-                    final String subscriptionGroupJsonStr = 
JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
-                    System.out.print(subscriptionGroupJsonStr + "\n");
-                    return;
-                }
+                configMap.put("subscriptionGroupTable", 
JSON.parseObject(JSONObject.toJSONString(configTable)));
             }
-            System.out.print("Config type was not recognized, configType=" + 
configType + "\n");
+            System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+        } catch (Exception e) {
+            System.out.print("Error occurred while converting RocksDB kv 
config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
         } finally {
-            kvConfigManager.stop();
+            configRocksDBStorage.shutdown();
         }
     }
-}
+}
\ No newline at end of file


Reply via email to