This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2a938fb1bc [ISSUE #8058] Support for upgrading metadata in json to
rocksdb (#8571)
2a938fb1bc is described below
commit 2a938fb1bcf35f18b2bcbf8616043be6fb105a36
Author: LetLetMe <[email protected]>
AuthorDate: Fri Aug 23 11:18:03 2024 +0800
[ISSUE #8058] Support for upgrading metadata in json to rocksdb (#8571)
---
.github/workflows/maven.yaml | 27 +-
.../rocketmq/broker}/RocksDBConfigManager.java | 60 +++-
.../offset/RocksDBConsumerOffsetManager.java | 14 +-
.../RocksDBSubscriptionGroupManager.java | 156 +++++++++-
.../subscription/SubscriptionGroupManager.java | 29 +-
.../broker/topic/RocksDBTopicConfigManager.java | 91 +++++-
.../rocketmq/broker/topic/TopicConfigManager.java | 52 +---
.../RocksdbGroupConfigTransferTest.java | 340 +++++++++++++++++++++
.../subscription/SubscriptionGroupManagerTest.java | 32 +-
.../topic/RocksdbTopicConfigManagerTest.java | 15 +-
.../topic/RocksdbTopicConfigTransferTest.java | 259 ++++++++++++++++
.../broker/topic/TopicConfigManagerTest.java | 10 +-
.../org/apache/rocketmq/common/ConfigManager.java | 12 -
.../common/config/AbstractRocksDBStorage.java | 21 +-
.../common/config/ConfigRocksDBStorage.java | 39 ++-
.../rocketmq/store/config/MessageStoreConfig.java | 12 +
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../metadata/RocksDBConfigToJsonCommand.java | 80 +++--
18 files changed, 1069 insertions(+), 182 deletions(-)
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 449f637894..a49201b8a1 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -29,19 +29,28 @@ jobs:
cache: "maven"
- name: Build with Maven
run: mvn -B package --file pom.xml
- - name: Upload JVM crash logs
+
+ - name: Run tests with increased memory and debug info
+ run: mvn test -X -Dparallel=none -DargLine="-Xmx1024m
-XX:MaxPermSize=256m"
+
+ - name: Upload Auth JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/auth/hs_err_pid*.log
retention-days: 1
- - name: Retry if failed
- # if it failed , retry 2 times at most
- if: failure() && fromJSON(github.run_attempt) < 3
- env:
- GH_REPO: ${{ github.repository }}
- GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Check for broker JVM crash logs
+ if: failure()
run: |
- echo "Attempting to retry workflow..."
- gh workflow run rerun-workflow.yml -F run_id=${{ github.run_id }}
\ No newline at end of file
+ echo "Checking for JVM crash logs..."
+ ls -al /Users/runner/work/rocketmq/rocketmq/broker/
+
+ - name: Upload broker JVM crash logs
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: jvm-crash-logs
+ path: /Users/runner/work/rocketmq/rocketmq/broker/hs_err_pid*.log
+ retention-days: 1
\ No newline at end of file
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
similarity index 63%
rename from
common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index d1ec894685..20358c4707 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -14,46 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.common.config;
+package org.apache.rocketmq.broker;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
+import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- protected volatile boolean isStop = false;
- protected ConfigRocksDBStorage configRocksDBStorage = null;
+ public volatile boolean isStop = false;
+ public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;
+
+ private final String filePath;
private final long memTableFlushInterval;
+ private DataVersion kvDataVersion = new DataVersion();
+
- public RocksDBConfigManager(long memTableFlushInterval) {
+ public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
+ this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
}
- public boolean load(String configFilePath, BiConsumer<byte[], byte[]>
biConsumer) {
+ public boolean init() {
this.isStop = false;
- this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
- if (!this.configRocksDBStorage.start()) {
- return false;
- }
- RocksIterator iterator = this.configRocksDBStorage.iterator();
+ this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
+ return this.configRocksDBStorage.start();
+ }
+ public boolean loadDataVersion() {
+ String currDataVersionString = null;
try {
+ byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
+ if (dataVersion != null && dataVersion.length > 0) {
+ currDataVersionString = new String(dataVersion,
StandardCharsets.UTF_8);
+ }
+ kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ?
JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
+ try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
- } finally {
- iterator.close();
}
this.flushOptions = new FlushOptions();
@@ -103,6 +123,20 @@ public class RocksDBConfigManager {
this.configRocksDBStorage.delete(keyBytes);
}
+ public void updateKvDataVersion() throws Exception {
+ kvDataVersion.nextVersion();
+
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
+ }
+
+ public DataVersion getKvDataVersion() {
+ return kvDataVersion;
+ }
+
+ public void updateForbidden(String key, String value) throws Exception {
+
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8));
+ }
+
+
public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
index 05b53b0bcf..de293fc499 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
@@ -22,7 +22,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;
@@ -31,14 +31,19 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
+ protected RocksDBConfigManager rocksDBConfigManager;
+
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
- this.rocksDBConfigManager = new
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}
@Override
public boolean load() {
- return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
+ if (!rocksDBConfigManager.init()) {
+ return false;
+ }
+ return this.rocksDBConfigManager.loadData(this::decodeOffset);
}
@Override
@@ -56,8 +61,7 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
}
}
- @Override
- protected void decode0(final byte[] key, final byte[] body) {
+ protected void decodeOffset(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body,
RocksDBOffsetSerializeWrapper.class);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
index e9a81a8d68..7df72dbe68 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
@@ -16,39 +16,116 @@
*/
package org.apache.rocketmq.broker.subscription;
-import java.io.File;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.RocksIterator;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
+ protected RocksDBConfigManager rocksDBConfigManager;
+
public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
- this.rocksDBConfigManager = new
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}
@Override
public boolean load() {
- if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
+ if (!rocksDBConfigManager.init()) {
+ return false;
+ }
+ if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
return false;
}
this.init();
return true;
}
+ public boolean loadDataVersion() {
+ return this.rocksDBConfigManager.loadDataVersion();
+ }
+
+ public boolean loadSubscriptionGroupAndForbidden() {
+ return
this.rocksDBConfigManager.loadData(this::decodeSubscriptionGroup)
+ && this.loadForbidden(this::decodeForbidden)
+ && merge();
+ }
+
+ public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
+ try (RocksIterator iterator =
this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ biConsumer.accept(iterator.key(), iterator.value());
+ iterator.next();
+ }
+ }
+ return true;
+ }
+
+
+ private boolean merge() {
+ if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
+ log.info("The switch is off, no merge operation is needed.");
+ return true;
+ }
+ if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+ log.info("json file and json back file not exist, so skip merge");
+ return true;
+ }
+
+ if (!super.load()) {
+ log.error("load group and forbidden info from json file error,
startup will exit");
+ return false;
+ }
+
+ final ConcurrentMap<String, SubscriptionGroupConfig> groupTable =
this.getSubscriptionGroupTable();
+ final ConcurrentMap<String, ConcurrentMap<String, Integer>>
forbiddenTable = this.getForbiddenTable();
+ final DataVersion dataVersion = super.getDataVersion();
+ final DataVersion kvDataVersion = this.getDataVersion();
+ if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ for (Map.Entry<String, SubscriptionGroupConfig> entry :
groupTable.entrySet()) {
+ putSubscriptionGroupConfig(entry.getValue());
+ log.info("import subscription config to rocksdb, group={}",
entry.getValue());
+ }
+ for (Map.Entry<String, ConcurrentMap<String, Integer>> entry :
forbiddenTable.entrySet()) {
+ try {
+ this.rocksDBConfigManager.updateForbidden(entry.getKey(),
JSON.toJSONString(entry.getValue()));
+ log.info("import forbidden config to rocksdb, group={}",
entry.getValue());
+ } catch (Exception e) {
+ log.error("import forbidden config to rocksdb failed,
group={}", entry.getValue());
+ return false;
+ }
+ }
+
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+ updateDataVersion();
+ }
+ log.info("finish marge subscription config from json file and merge to
rocksdb");
+ this.persist();
+
+ return true;
+ }
+
@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}
@Override
- protected SubscriptionGroupConfig
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+ public SubscriptionGroupConfig
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
String groupName = subscriptionGroupConfig.getGroupName();
SubscriptionGroupConfig oldConfig =
this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
@@ -89,8 +166,8 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
return subscriptionGroupConfig;
}
- @Override
- protected void decode0(byte[] key, byte[] body) {
+
+ protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
String groupName = new String(key, DataConverter.CHARSET_UTF8);
SubscriptionGroupConfig subscriptionGroupConfig =
JSON.parseObject(body, SubscriptionGroupConfig.class);
@@ -105,8 +182,63 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
}
}
- @Override
- public String configFilePath() {
+ public String rocksdbConfigFilePath() {
return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "subscriptionGroups" +
File.separator;
}
+
+ @Override
+ public DataVersion getDataVersion() {
+ return rocksDBConfigManager.getKvDataVersion();
+ }
+
+ @Override
+ public void updateDataVersion() {
+ try {
+ rocksDBConfigManager.updateKvDataVersion();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void decodeForbidden(byte[] key, byte[] body) {
+ String forbiddenGroupName = new String(key,
DataConverter.CHARSET_UTF8);
+ JSONObject jsonObject = JSON.parseObject(new String(body,
DataConverter.CHARSET_UTF8));
+ Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
+ ConcurrentMap<String, Integer> forbiddenGroup = new
ConcurrentHashMap<>(entries.size());
+ for (Map.Entry<String, Object> entry : entries) {
+ forbiddenGroup.put(entry.getKey(), (Integer) entry.getValue());
+ }
+ this.getForbiddenTable().put(forbiddenGroupName, forbiddenGroup);
+ log.info("load forbidden,{} value {}", forbiddenGroupName,
forbiddenGroup.toString());
+ }
+
+ @Override
+ public void updateForbidden(String group, String topic, int
forbiddenIndex, boolean setOrClear) {
+ try {
+ super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
+ this.rocksDBConfigManager.updateForbidden(group,
JSON.toJSONString(this.getForbiddenTable().get(group)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setForbidden(String group, String topic, int forbiddenIndex) {
+ try {
+ super.setForbidden(group, topic, forbiddenIndex);
+ this.rocksDBConfigManager.updateForbidden(group,
JSON.toJSONString(this.getForbiddenTable().get(group)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void clearForbidden(String group, String topic, int forbiddenIndex)
{
+ try {
+ super.clearForbidden(group, topic, forbiddenIndex);
+ this.rocksDBConfigManager.updateForbidden(group,
JSON.toJSONString(this.getForbiddenTable().get(group)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e63b930586..1d9614fe58 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -121,7 +121,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
}
}
- protected SubscriptionGroupConfig
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+ public SubscriptionGroupConfig
putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
return
this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(),
subscriptionGroupConfig);
}
@@ -156,8 +156,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
log.info("create new subscription group, {}", config);
}
- long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
}
@@ -214,7 +213,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
return topicForbidden;
}
- private void updateForbiddenValue(String group, String topic, Integer
forbidden) {
+ protected void updateForbiddenValue(String group, String topic, Integer
forbidden) {
if (forbidden == null || forbidden <= 0) {
this.forbiddenTable.remove(group);
log.info("clear group forbidden, {}@{} ", group, topic);
@@ -233,8 +232,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
log.info("set group forbidden, {}@{} old: {} new: {}", group,
topic, 0, forbidden);
}
- long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
}
@@ -243,8 +241,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
SubscriptionGroupConfig old = getSubscriptionGroupConfig(groupName);
if (old != null) {
old.setConsumeEnable(false);
- long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
}
}
@@ -261,8 +258,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
if (null == preConfig) {
log.info("auto create a subscription group, {}",
subscriptionGroupConfig.toString());
}
- long stateMachineVersion = brokerController.getMessageStore()
!= null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
}
}
@@ -331,8 +327,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
this.forbiddenTable.remove(groupName);
if (old != null) {
log.info("delete subscription group OK, subscription group:{}",
old);
- long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
} else {
log.warn("delete subscription group failed, subscription
groupName: {} not exist", groupName);
@@ -369,4 +364,14 @@ public class SubscriptionGroupManager extends
ConfigManager {
}
}
}
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion.assignNewOne(dataVersion);
+ }
+
+ public void updateDataVersion() {
+ long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
+ }
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
index fddecf2d92..2a89dd7e02 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
@@ -16,39 +16,86 @@
*/
package org.apache.rocketmq.broker.topic;
-import java.io.File;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
public class RocksDBTopicConfigManager extends TopicConfigManager {
+ protected RocksDBConfigManager rocksDBConfigManager;
+
public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
- this.rocksDBConfigManager = new
RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+ this.rocksDBConfigManager = new
RocksDBConfigManager(rocksdbConfigFilePath(),
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}
@Override
public boolean load() {
- if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
+ if (!rocksDBConfigManager.init()) {
+ return false;
+ }
+ if (!loadDataVersion() || !loadTopicConfig()) {
return false;
}
this.init();
return true;
}
+ public boolean loadTopicConfig() {
+ return this.rocksDBConfigManager.loadData(this::decodeTopicConfig) &&
merge();
+ }
+
+ public boolean loadDataVersion() {
+ return this.rocksDBConfigManager.loadDataVersion();
+ }
+
+ private boolean merge() {
+ if
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
+ log.info("The switch is off, no merge operation is needed.");
+ return true;
+ }
+ if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+ log.info("json file and json back file not exist, so skip merge");
+ return true;
+ }
+
+ if (!super.load()) {
+ log.error("load topic config from json file error, startup will
exit");
+ return false;
+ }
+
+ final ConcurrentMap<String, TopicConfig> topicConfigTable =
this.getTopicConfigTable();
+ final DataVersion dataVersion = super.getDataVersion();
+ final DataVersion kvDataVersion = this.getDataVersion();
+ if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get())
{
+ for (Map.Entry<String, TopicConfig> entry :
topicConfigTable.entrySet()) {
+ putTopicConfig(entry.getValue());
+ log.info("import topic config to rocksdb, topic={}",
entry.getValue());
+ }
+
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+ updateDataVersion();
+ }
+ log.info("finish read topic config from json file and merge to
rocksdb");
+ this.persist();
+ return true;
+ }
+
+
@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}
- @Override
- protected void decode0(byte[] key, byte[] body) {
+ protected void decodeTopicConfig(byte[] key, byte[] body) {
String topicName = new String(key, DataConverter.CHARSET_UTF8);
TopicConfig topicConfig = JSON.parseObject(body, TopicConfig.class);
@@ -57,12 +104,7 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
}
@Override
- public String configFilePath() {
- return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "topics" + File.separator;
- }
-
- @Override
- protected TopicConfig putTopicConfig(TopicConfig topicConfig) {
+ public TopicConfig putTopicConfig(TopicConfig topicConfig) {
String topicName = topicConfig.getTopicName();
TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName,
topicConfig);
try {
@@ -92,4 +134,23 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
this.rocksDBConfigManager.flushWAL();
}
}
+
+ public String rocksdbConfigFilePath() {
+ return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "topics" + File.separator;
+ }
+
+
+ @Override
+ public DataVersion getDataVersion() {
+ return rocksDBConfigManager.getKvDataVersion();
+ }
+
+ @Override
+ public void updateDataVersion() {
+ try {
+ rocksDBConfigManager.updateKvDataVersion();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index c71c65fe8b..eab2896b00 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -225,7 +225,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
- protected TopicConfig putTopicConfig(TopicConfig topicConfig) {
+ public TopicConfig putTopicConfig(TopicConfig topicConfig) {
return this.topicConfigTable.put(topicConfig.getTopicName(),
topicConfig);
}
@@ -293,8 +293,7 @@ public class TopicConfigManager extends ConfigManager {
putTopicConfig(topicConfig);
- long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
createNew = true;
@@ -337,8 +336,7 @@ public class TopicConfigManager extends ConfigManager {
}
log.info("Create new topic [{}] config:[{}]",
topicConfig.getTopicName(), topicConfig);
putTopicConfig(topicConfig);
- long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
createNew = true;
this.persist();
} finally {
@@ -397,8 +395,7 @@ public class TopicConfigManager extends ConfigManager {
log.info("create new topic {}", topicConfig);
putTopicConfig(topicConfig);
createNew = true;
- long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
} finally {
this.topicConfigTableLock.unlock();
@@ -438,8 +435,7 @@ public class TopicConfigManager extends ConfigManager {
log.info("create new topic {}", topicConfig);
putTopicConfig(topicConfig);
createNew = true;
- long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
} finally {
this.topicConfigTableLock.unlock();
@@ -472,8 +468,7 @@ public class TopicConfigManager extends ConfigManager {
putTopicConfig(topicConfig);
- long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
registerBrokerData(topicConfig);
@@ -495,8 +490,7 @@ public class TopicConfigManager extends ConfigManager {
putTopicConfig(topicConfig);
- long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
registerBrokerData(topicConfig);
@@ -509,7 +503,6 @@ public class TopicConfigManager extends ConfigManager {
Map<String, String> newAttributes = request(topicConfig);
Map<String, String> currentAttributes =
current(topicConfig.getTopicName());
-
Map<String, String> finalAttributes =
AttributeUtil.alterCurrentAttributes(
this.topicConfigTable.get(topicConfig.getTopicName()) == null,
TopicAttributes.ALL,
@@ -526,8 +519,7 @@ public class TopicConfigManager extends ConfigManager {
log.info("create new topic [{}]", topicConfig);
}
- long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
}
public void updateTopicConfig(final TopicConfig topicConfig) {
@@ -581,25 +573,8 @@ public class TopicConfigManager extends ConfigManager {
}
}
- // We don't have a mandatory rule to maintain the validity of
order conf in NameServer,
- // so we may overwrite the order field mistakenly.
- // To avoid the above case, we comment the below codes, please use
mqadmin API to update
- // the order filed.
- /*for (Map.Entry<String, TopicConfig> entry :
this.topicConfigTable.entrySet()) {
- String topic = entry.getKey();
- if (!orderTopics.contains(topic)) {
- TopicConfig topicConfig = entry.getValue();
- if (topicConfig.isOrder()) {
- topicConfig.setOrder(false);
- isChange = true;
- log.info("update order topic config, topic={},
order={}", topic, false);
- }
- }
- }*/
-
if (isChange) {
- long stateMachineVersion = brokerController.getMessageStore()
!= null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
}
}
@@ -623,8 +598,7 @@ public class TopicConfigManager extends ConfigManager {
TopicConfig old = removeTopicConfig(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
- long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
this.persist();
} else {
log.warn("delete topic config failed, topic: {} not exists",
topic);
@@ -739,5 +713,11 @@ public class TopicConfigManager extends ConfigManager {
return topicConfigTable.containsKey(topic);
}
+ public void updateDataVersion() {
+ long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
+ }
+
+
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
new file mode 100644
index 0000000000..205e642843
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.subscription;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbGroupConfigTransferTest {
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
+
+ private RocksDBSubscriptionGroupManager rocksDBSubscriptionGroupManager;
+
+ private SubscriptionGroupManager jsonSubscriptionGroupManager;
+ @Mock
+ private BrokerController brokerController;
+
+ @Mock
+ private DefaultMessageStore defaultMessageStore;
+
+ @Before
+ public void init() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ BrokerConfig brokerConfig = new BrokerConfig();
+
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
+ messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
+
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+ }
+
+ @After
+ public void destroy() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ Path pathToBeDeleted = Paths.get(basePath);
+
+ try {
+ Files.walk(pathToBeDeleted)
+ .sorted(Comparator.reverseOrder())
+ .forEach(path -> {
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ // ignore
+ }
+ });
+ } catch (IOException e) {
+ // ignore
+ }
+ if (rocksDBSubscriptionGroupManager != null) {
+ rocksDBSubscriptionGroupManager.stop();
+ }
+ }
+
+
+ public void initRocksDBSubscriptionGroupManager() {
+ if (rocksDBSubscriptionGroupManager == null) {
+ rocksDBSubscriptionGroupManager = new
RocksDBSubscriptionGroupManager(brokerController);
+ rocksDBSubscriptionGroupManager.load();
+ }
+ }
+
+ public void initJsonSubscriptionGroupManager() {
+ if (jsonSubscriptionGroupManager == null) {
+ jsonSubscriptionGroupManager = new
SubscriptionGroupManager(brokerController);
+ jsonSubscriptionGroupManager.load();
+ }
+ }
+
+ @Test
+ public void theFirstTimeLoadJsonSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initJsonSubscriptionGroupManager();
+ DataVersion dataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(0L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ }
+
+ @Test
+ public void theFirstTimeLoadRocksDBSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initRocksDBSubscriptionGroupManager();
+ DataVersion dataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(0L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ }
+
+
+ @Test
+ public void addGroupLoadJsonSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initJsonSubscriptionGroupManager();
+ int beforeSize =
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size();
+ String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(groupName);
+ subscriptionGroupConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+
jsonSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+ int afterSize =
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size();
+ DataVersion afterDataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+
+ Assert.assertEquals(0, beforeDataVersionCounter);
+ Assert.assertEquals(1, afterDataVersionCounter);
+ Assert.assertEquals(1, afterSize - beforeSize);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ }
+
+ @Test
+ public void addForbiddenGroupLoadJsonSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initJsonSubscriptionGroupManager();
+ int beforeSize =
jsonSubscriptionGroupManager.getForbiddenTable().size();
+ String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(groupName);
+ subscriptionGroupConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+ jsonSubscriptionGroupManager.setForbidden(groupName, "topic", 0);
+ int afterSize =
jsonSubscriptionGroupManager.getForbiddenTable().size();
+ DataVersion afterDataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+
+ Assert.assertEquals(1, afterDataVersionCounter -
beforeDataVersionCounter);
+ Assert.assertEquals(1, afterSize - beforeSize);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ }
+
+ @Test
+ public void addGroupLoadRocksdbSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initRocksDBSubscriptionGroupManager();
+ int beforeSize =
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size();
+ String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(groupName);
+ subscriptionGroupConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+
rocksDBSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+ int afterSize =
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size();
+ DataVersion afterDataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+ Assert.assertEquals(1, afterDataVersionCounter);
+ Assert.assertEquals(0, beforeDataVersionCounter);
+ Assert.assertEquals(1, afterSize - beforeSize);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ }
+
+ @Test
+ public void addForbiddenLoadRocksdbSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initRocksDBSubscriptionGroupManager();
+ int beforeSize =
rocksDBSubscriptionGroupManager.getForbiddenTable().size();
+ String groupName = "testAddGroupConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(groupName);
+ subscriptionGroupConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+ rocksDBSubscriptionGroupManager.updateForbidden(groupName, "topic", 0,
true);
+
+ int afterSize =
rocksDBSubscriptionGroupManager.getForbiddenTable().size();
+ DataVersion afterDataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+ Assert.assertEquals(1, afterDataVersionCounter -
beforeDataVersionCounter);
+ Assert.assertEquals(1, afterSize - beforeSize);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ }
+
+ @Test
+ public void theSecondTimeLoadJsonSubscriptionGroupManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addGroupLoadJsonSubscriptionGroupManager();
+ jsonSubscriptionGroupManager.stop();
+ rocksDBSubscriptionGroupManager = null;
+ addForbiddenGroupLoadJsonSubscriptionGroupManager();
+ jsonSubscriptionGroupManager.stop();
+ rocksDBSubscriptionGroupManager = null;
+ jsonSubscriptionGroupManager = new
SubscriptionGroupManager(brokerController);
+ jsonSubscriptionGroupManager.load();
+ DataVersion dataVersion =
jsonSubscriptionGroupManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(2L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ Assert.assertNotEquals(0,
jsonSubscriptionGroupManager.getForbiddenTable().size());
+ Assert.assertNotEquals(0,
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ }
+
+ @Test
+ public void theSecondTimeLoadRocksdbTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addGroupLoadRocksdbSubscriptionGroupManager();
+ rocksDBSubscriptionGroupManager.stop();
+ rocksDBSubscriptionGroupManager = null;
+ addForbiddenLoadRocksdbSubscriptionGroupManager();
+ rocksDBSubscriptionGroupManager.stop();
+ rocksDBSubscriptionGroupManager = null;
+ rocksDBSubscriptionGroupManager = new
RocksDBSubscriptionGroupManager(brokerController);
+ rocksDBSubscriptionGroupManager.load();
+ DataVersion dataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(2L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+ }
+
+
+ @Test
+ public void jsonUpgradeToRocksdb() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addGroupLoadJsonSubscriptionGroupManager();
+ addForbiddenGroupLoadJsonSubscriptionGroupManager();
+ initRocksDBSubscriptionGroupManager();
+ DataVersion dataVersion =
rocksDBSubscriptionGroupManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(3L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+
Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(),
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+
Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(),
jsonSubscriptionGroupManager.getForbiddenTable().size());
+
+ rocksDBSubscriptionGroupManager.stop();
+ rocksDBSubscriptionGroupManager = new
RocksDBSubscriptionGroupManager(brokerController);
+ rocksDBSubscriptionGroupManager.load();
+ dataVersion = rocksDBSubscriptionGroupManager.getDataVersion();
+ Assert.assertEquals(3L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getForbiddenTable().size());
+ Assert.assertNotEquals(0,
rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size());
+
Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(),
jsonSubscriptionGroupManager.getSubscriptionGroupTable().size());
+
Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(),
jsonSubscriptionGroupManager.getForbiddenTable().size());
+ }
+
+ private boolean notToBeExecuted() {
+ return MixAll.isMac();
+ }
+
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3c829437cf..3ed4ac11a4 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -18,7 +18,11 @@
package org.apache.rocketmq.broker.subscription;
import com.google.common.collect.ImmutableMap;
+
+import java.nio.file.Paths;
import java.util.Map;
+import java.util.UUID;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
@@ -30,36 +34,42 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SubscriptionGroupManagerTest {
private String group = "group";
+
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
@Mock
private BrokerController brokerControllerMock;
private SubscriptionGroupManager subscriptionGroupManager;
@Before
public void before() {
+ if (notToBeExecuted()) {
+ return;
+ }
SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
"test",
false,
false
));
subscriptionGroupManager = spy(new
SubscriptionGroupManager(brokerControllerMock));
- when(brokerControllerMock.getMessageStore()).thenReturn(null);
- doNothing().when(subscriptionGroupManager).persist();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
+
Mockito.lenient().when(brokerControllerMock.getMessageStoreConfig()).thenReturn(messageStoreConfig);
}
@After
public void destroy() {
- if (MixAll.isMac()) {
+ if (notToBeExecuted()) {
return;
}
if (subscriptionGroupManager != null) {
@@ -69,18 +79,18 @@ public class SubscriptionGroupManagerTest {
@Test
public void testUpdateAndCreateSubscriptionGroupInRocksdb() {
- if (MixAll.isMac()) {
+ if (notToBeExecuted()) {
return;
}
- when(brokerControllerMock.getMessageStoreConfig()).thenReturn(new
MessageStoreConfig());
- subscriptionGroupManager = spy(new
RocksDBSubscriptionGroupManager(brokerControllerMock));
- subscriptionGroupManager.load();
group += System.currentTimeMillis();
updateSubscriptionGroupConfig();
}
@Test
public void updateSubscriptionGroupConfig() {
+ if (notToBeExecuted()) {
+ return;
+ }
SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
Map<String, String> attr = ImmutableMap.of("+test", "true");
@@ -99,4 +109,8 @@ public class SubscriptionGroupManagerTest {
assertThatThrownBy(() ->
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1))
.isInstanceOf(RuntimeException.class).hasMessage("attempt to
update an unchangeable attribute. key: test");
}
+
+ private boolean notToBeExecuted() {
+ return MixAll.isMac();
+ }
}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index ed71a3313a..b0e0d05736 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.rocketmq.broker.topic;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -39,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static com.google.common.collect.Sets.newHashSet;
@@ -47,6 +51,10 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class RocksdbTopicConfigManagerTest {
+
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
+
private RocksDBTopicConfigManager topicConfigManager;
@Mock
private BrokerController brokerController;
@@ -62,9 +70,11 @@ public class RocksdbTopicConfigManagerTest {
BrokerConfig brokerConfig = new BrokerConfig();
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
+ messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
- when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+
Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
topicConfigManager = new RocksDBTopicConfigManager(brokerController);
topicConfigManager.load();
}
@@ -197,7 +207,6 @@ public class RocksdbTopicConfigManagerTest {
TopicConfig existingTopicConfig =
topicConfigManager.getTopicConfigTable().get(topic);
Assert.assertEquals("enum-2",
existingTopicConfig.getAttributes().get("enum.key"));
Assert.assertEquals("16",
existingTopicConfig.getAttributes().get("long.range.key"));
- // assert file
}
@Test
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
new file mode 100644
index 0000000000..2a72709098
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbTopicConfigTransferTest {
+
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
+
+ private RocksDBTopicConfigManager rocksdbTopicConfigManager;
+
+ private TopicConfigManager jsonTopicConfigManager;
+ @Mock
+ private BrokerController brokerController;
+
+ @Mock
+ private DefaultMessageStore defaultMessageStore;
+
+ @Before
+ public void init() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ BrokerConfig brokerConfig = new BrokerConfig();
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
+ messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
+
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+ }
+
+ @After
+ public void destroy() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ Path pathToBeDeleted = Paths.get(basePath);
+ try {
+ Files.walk(pathToBeDeleted)
+ .sorted(Comparator.reverseOrder())
+ .forEach(path -> {
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ // ignore
+ }
+ });
+ } catch (IOException e) {
+ // ignore
+ }
+ if (rocksdbTopicConfigManager != null) {
+ rocksdbTopicConfigManager.stop();
+ }
+ }
+
+ public void initRocksdbTopicConfigManager() {
+ if (rocksdbTopicConfigManager == null) {
+ rocksdbTopicConfigManager = new
RocksDBTopicConfigManager(brokerController);
+ rocksdbTopicConfigManager.load();
+ }
+ }
+
+ public void initJsonTopicConfigManager() {
+ if (jsonTopicConfigManager == null) {
+ jsonTopicConfigManager = new TopicConfigManager(brokerController);
+ jsonTopicConfigManager.load();
+ }
+ }
+
+ @Test
+ public void theFirstTimeLoadJsonTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initJsonTopicConfigManager();
+ DataVersion dataVersion = jsonTopicConfigManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(0L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
jsonTopicConfigManager.getTopicConfigTable().size());
+ }
+
+ @Test
+ public void theFirstTimeLoadRocksdbTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initRocksdbTopicConfigManager();
+ DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(0L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksdbTopicConfigManager.getTopicConfigTable().size());
+ }
+
+
+ @Test
+ public void addTopicLoadJsonTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initJsonTopicConfigManager();
+ String topicName = "testAddTopicConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topicName);
+ topicConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
jsonTopicConfigManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+ jsonTopicConfigManager.updateTopicConfig(topicConfig);
+
+ DataVersion afterDataVersion = jsonTopicConfigManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+
+ Assert.assertEquals(0, beforeDataVersionCounter);
+ Assert.assertEquals(1, afterDataVersionCounter);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ }
+
+ @Test
+ public void addTopicLoadRocksdbTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ initRocksdbTopicConfigManager();
+ String topicName = "testAddTopicConfig-" + System.currentTimeMillis();
+
+ Map<String, String> attributes = new HashMap<>();
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topicName);
+ topicConfig.setAttributes(attributes);
+ DataVersion beforeDataVersion =
rocksdbTopicConfigManager.getDataVersion();
+ long beforeDataVersionCounter = beforeDataVersion.getCounter().get();
+ long beforeTimestamp = beforeDataVersion.getTimestamp();
+
+ rocksdbTopicConfigManager.updateTopicConfig(topicConfig);
+
+ DataVersion afterDataVersion =
rocksdbTopicConfigManager.getDataVersion();
+ long afterDataVersionCounter = afterDataVersion.getCounter().get();
+ long afterTimestamp = afterDataVersion.getTimestamp();
+ Assert.assertEquals(0, beforeDataVersionCounter);
+ Assert.assertEquals(1, afterDataVersionCounter);
+ Assert.assertTrue(afterTimestamp >= beforeTimestamp);
+ }
+
+ @Test
+ public void theSecondTimeLoadJsonTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addTopicLoadJsonTopicConfigManager();
+ jsonTopicConfigManager.stop();
+ jsonTopicConfigManager = new TopicConfigManager(brokerController);
+ jsonTopicConfigManager.load();
+ DataVersion dataVersion = jsonTopicConfigManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(1L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
jsonTopicConfigManager.getTopicConfigTable().size());
+ }
+
+ @Test
+ public void theSecondTimeLoadRocksdbTopicConfigManager() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addTopicLoadRocksdbTopicConfigManager();
+ rocksdbTopicConfigManager.stop();
+ rocksdbTopicConfigManager = null;
+ rocksdbTopicConfigManager = new
RocksDBTopicConfigManager(brokerController);
+ rocksdbTopicConfigManager.load();
+ DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(1L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksdbTopicConfigManager.getTopicConfigTable().size());
+ }
+
+ @Test
+ public void jsonUpgradeToRocksdb() {
+ if (notToBeExecuted()) {
+ return;
+ }
+ addTopicLoadJsonTopicConfigManager();
+ initRocksdbTopicConfigManager();
+ DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion();
+ Assert.assertNotNull(dataVersion);
+ Assert.assertEquals(2L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksdbTopicConfigManager.getTopicConfigTable().size());
+
Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(),
jsonTopicConfigManager.getTopicConfigTable().size());
+
+ rocksdbTopicConfigManager.stop();
+ rocksdbTopicConfigManager = new
RocksDBTopicConfigManager(brokerController);
+ rocksdbTopicConfigManager.load();
+ dataVersion = rocksdbTopicConfigManager.getDataVersion();
+ Assert.assertEquals(2L, dataVersion.getCounter().get());
+ Assert.assertEquals(0L, dataVersion.getStateVersion());
+ Assert.assertNotEquals(0,
rocksdbTopicConfigManager.getTopicConfigTable().size());
+
Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(),
rocksdbTopicConfigManager.getTopicConfigTable().size());
+ }
+
+
+ private boolean notToBeExecuted() {
+ return MixAll.isMac();
+ }
+
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 6052a79d41..3fd1d14c3a 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.rocketmq.broker.topic;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicAttributes;
@@ -37,6 +40,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static com.google.common.collect.Sets.newHashSet;
@@ -45,6 +49,9 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TopicConfigManagerTest {
+
+ private final String basePath = Paths.get(System.getProperty("user.home"),
+ "unit-test-store", UUID.randomUUID().toString().substring(0,
16).toUpperCase()).toString();
private TopicConfigManager topicConfigManager;
@Mock
private BrokerController brokerController;
@@ -57,8 +64,9 @@ public class TopicConfigManagerTest {
BrokerConfig brokerConfig = new BrokerConfig();
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(basePath);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
topicConfigManager = new TopicConfigManager(brokerController);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index 099f0d8d56..3fcf466fd7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.common;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.rocksdb.Statistics;
import java.io.IOException;
import java.util.Map;
@@ -28,8 +26,6 @@ import java.util.Map;
public abstract class ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- protected RocksDBConfigManager rocksDBConfigManager;
-
public boolean load() {
String fileName = null;
try {
@@ -89,10 +85,6 @@ public abstract class ConfigManager {
}
}
- protected void decode0(final byte[] key, final byte[] body) {
-
- }
-
public boolean stop() {
return true;
}
@@ -104,8 +96,4 @@ public abstract class ConfigManager {
public abstract String encode(final boolean prettyFormat);
public abstract void decode(final String jsonString);
-
- public Statistics getStatistics() {
- return rocksDBConfigManager == null ? null :
rocksDBConfigManager.getStatistics();
- }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index ed3a12dc24..f88b8e198b 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -16,18 +16,7 @@
*/
package org.apache.rocketmq.common.config;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.collect.Maps;
-
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
@@ -51,6 +40,16 @@ import org.rocksdb.Status;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import static org.rocksdb.RocksDB.NOT_FOUND;
public abstract class AbstractRocksDBStorage {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index b40f8046e8..f657d9cf2d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.common.config;
import java.io.File;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -54,6 +55,15 @@ import org.rocksdb.util.SizeUnit;
public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
+ private static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME =
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] KV_DATA_VERSION_KEY =
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
+ protected ColumnFamilyHandle kvDataVersionFamilyHandle;
+
+ private static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME =
"forbidden".getBytes(StandardCharsets.UTF_8);
+ protected ColumnFamilyHandle forbiddenFamilyHandle;
+
+
+
public ConfigRocksDBStorage(final String dbPath) {
super();
this.dbPath = dbPath;
@@ -115,11 +125,15 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
ColumnFamilyOptions defaultOptions = createConfigOptions();
this.cfOptions.add(defaultOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
-
+ cfDescriptors.add(new
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
+ cfDescriptors.add(new
ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
final List<ColumnFamilyHandle> cfHandles = new ArrayList();
open(cfDescriptors, cfHandles);
this.defaultCFHandle = cfHandles.get(0);
+ this.kvDataVersionFamilyHandle = cfHandles.get(1);
+ this.forbiddenFamilyHandle = cfHandles.get(2);
+
} catch (final Exception e) {
AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}",
this.dbPath, e);
return false;
@@ -129,7 +143,8 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
@Override
protected void preShutdown() {
-
+ this.kvDataVersionFamilyHandle.close();
+ this.forbiddenFamilyHandle.close();
}
private ColumnFamilyOptions createConfigOptions() {
@@ -225,6 +240,22 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes);
}
+ public void updateKvDataVersion(final byte[] valueBytes) throws Exception {
+ put(this.kvDataVersionFamilyHandle, this.ableWalWriteOptions,
KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, valueBytes, valueBytes.length);
+ }
+
+ public byte[] getKvDataVersion() throws Exception {
+ return get(this.kvDataVersionFamilyHandle, this.totalOrderReadOptions,
KV_DATA_VERSION_KEY);
+ }
+
+ public void updateForbidden(final byte[] keyBytes, final byte[]
valueBytes) throws Exception {
+ put(this.forbiddenFamilyHandle, this.ableWalWriteOptions, keyBytes,
keyBytes.length, valueBytes, valueBytes.length);
+ }
+
+ public byte[] getForbidden(final byte[] keyBytes) throws Exception {
+ return get(this.forbiddenFamilyHandle, this.totalOrderReadOptions,
keyBytes);
+ }
+
public void delete(final byte[] keyBytes) throws Exception {
delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes);
}
@@ -246,6 +277,10 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
return this.db.newIterator(this.defaultCFHandle,
this.totalOrderReadOptions);
}
+ public RocksIterator forbiddenIterator() {
+ return this.db.newIterator(this.forbiddenFamilyHandle,
this.totalOrderReadOptions);
+ }
+
public void rangeDelete(final byte[] startKey, final byte[] endKey) throws
RocksDBException {
rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 5b2a1931b3..0b45d92418 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -105,6 +105,9 @@ public class MessageStoreConfig {
// default, defaultRocksDB
@ImportantField
private String storeType = StoreType.DEFAULT.getStoreType();
+
+ private boolean transferMetadataJsonToRocksdb = false;
+
// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 *
ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
@@ -1842,4 +1845,13 @@ public class MessageStoreConfig {
public void setPutConsumeQueueDataByFileChannel(boolean
putConsumeQueueDataByFileChannel) {
this.putConsumeQueueDataByFileChannel =
putConsumeQueueDataByFileChannel;
}
+
+ public boolean isTransferMetadataJsonToRocksdb() {
+ return transferMetadataJsonToRocksdb;
+ }
+
+ public void setTransferMetadataJsonToRocksdb(boolean
transferMetadataJsonToRocksdb) {
+ this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
+ }
+
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index e785934ba3..d56ed05326 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -92,6 +92,7 @@ import
org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
+import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -300,6 +301,7 @@ public class MQAdminStartup {
initCommand(new GetAclSubCommand());
initCommand(new ListAclSubCommand());
initCommand(new CopyAclsSubCommand());
+ initCommand(new RocksDBConfigToJsonCommand());
}
private static void printHelp() {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index b987ad873b..1d81287ac7 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.tools.command.metadata;
+import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import org.rocksdb.RocksIterator;
import java.io.File;
import java.util.HashMap;
@@ -47,8 +50,8 @@ public class RocksDBConfigToJsonCommand implements SubCommand
{
@Override
public Options buildCommandlineOptions(Options options) {
- Option pathOption = new Option("p", "path", true,
- "Absolute path to the metadata directory");
+ Option pathOption = new Option("p", "configPath", true,
+ "Absolute path to the metadata config directory");
pathOption.setRequired(true);
options.addOption(pathOption);
@@ -62,57 +65,50 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
@Override
public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
- String path = commandLine.getOptionValue("path").trim();
+ String path = commandLine.getOptionValue("configPath").trim();
if (StringUtils.isEmpty(path) || !new File(path).exists()) {
System.out.print("Rocksdb path is invalid.\n");
return;
}
String configType =
commandLine.getOptionValue("configType").trim().toLowerCase();
+ if (!path.endsWith("/")) {
+ path += "/";
+ }
+ path += configType;
+
+ ConfigRocksDBStorage configRocksDBStorage = new
ConfigRocksDBStorage(path, true);
+ configRocksDBStorage.start();
+ RocksIterator iterator = configRocksDBStorage.iterator();
- final long memTableFlushInterval = 60 * 60 * 1000L;
- RocksDBConfigManager kvConfigManager = new
RocksDBConfigManager(memTableFlushInterval);
try {
- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
- // for topics.json
- final Map<String, JSONObject> topicsJsonConfig = new
HashMap<>();
- final Map<String, JSONObject> topicConfigTable = new
HashMap<>();
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
- final String topic = new String(key,
DataConverter.CHARSET_UTF8);
- final String topicConfig = new String(value,
DataConverter.CHARSET_UTF8);
- final JSONObject jsonObject =
JSONObject.parseObject(topicConfig);
- topicConfigTable.put(topic, jsonObject);
- });
+ final Map<String, JSONObject> configMap = new HashMap<>();
+ final Map<String, JSONObject> configTable = new HashMap<>();
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ final byte[] key = iterator.key();
+ final byte[] value = iterator.value();
+ final String name = new String(key,
DataConverter.CHARSET_UTF8);
+ final String config = new String(value,
DataConverter.CHARSET_UTF8);
+ final JSONObject jsonObject = JSONObject.parseObject(config);
+ configTable.put(name, jsonObject);
+ iterator.next();
+ }
+ byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
+ configMap.put("dataVersion",
+ JSONObject.parseObject(new String(kvDataVersion,
DataConverter.CHARSET_UTF8)));
- if (isLoad) {
- topicsJsonConfig.put("topicConfigTable", (JSONObject)
JSONObject.toJSON(topicConfigTable));
- final String topicsJsonStr =
JSONObject.toJSONString(topicsJsonConfig, true);
- System.out.print(topicsJsonStr + "\n");
- return;
- }
+ if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
+ configMap.put("topicConfigTable",
JSON.parseObject(JSONObject.toJSONString(configTable)));
}
if
(SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
- // for subscriptionGroup.json
- final Map<String, JSONObject> subscriptionGroupJsonConfig =
new HashMap<>();
- final Map<String, JSONObject> subscriptionGroupTable = new
HashMap<>();
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
- final String subscriptionGroup = new String(key,
DataConverter.CHARSET_UTF8);
- final String subscriptionGroupConfig = new String(value,
DataConverter.CHARSET_UTF8);
- final JSONObject jsonObject =
JSONObject.parseObject(subscriptionGroupConfig);
- subscriptionGroupTable.put(subscriptionGroup, jsonObject);
- });
-
- if (isLoad) {
- subscriptionGroupJsonConfig.put("subscriptionGroupTable",
- (JSONObject)
JSONObject.toJSON(subscriptionGroupTable));
- final String subscriptionGroupJsonStr =
JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
- System.out.print(subscriptionGroupJsonStr + "\n");
- return;
- }
+ configMap.put("subscriptionGroupTable",
JSON.parseObject(JSONObject.toJSONString(configTable)));
}
- System.out.print("Config type was not recognized, configType=" +
configType + "\n");
+ System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+ } catch (Exception e) {
+ System.out.print("Error occurred while converting RocksDB kv
config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
} finally {
- kvConfigManager.stop();
+ configRocksDBStorage.shutdown();
}
}
-}
+}
\ No newline at end of file