This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e2abbc31d9 [ISSUE #8740] fix rocksDBConfigToJson command (#8738)
e2abbc31d9 is described below
commit e2abbc31d9feb59d04879071f3e123564374d9c4
Author: yuz10 <[email protected]>
AuthorDate: Wed Sep 25 10:14:38 2024 +0800
[ISSUE #8740] fix rocksDBConfigToJson command (#8738)
* fix rocksDBConfigToJson command
* fix
---
.../metadata/RocksDBConfigToJsonCommand.java | 70 ++++++++++++++++++----
1 file changed, 57 insertions(+), 13 deletions(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index 1d81287ac7..f2803b0cbb 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tools.command.metadata;
-import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -33,10 +32,13 @@ import org.rocksdb.RocksIterator;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class RocksDBConfigToJsonCommand implements SubCommand {
private static final String TOPICS_JSON_CONFIG = "topics";
private static final String SUBSCRIPTION_GROUP_JSON_CONFIG =
"subscriptionGroups";
+ private static final String CONSUMER_OFFSETS_JSON_CONFIG =
"consumerOffsets";
@Override
public String commandName() {
@@ -45,7 +47,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand
{
@Override
public String commandDesc() {
- return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
+ return "Convert RocksDB kv config
(topics/subscriptionGroups/consumerOffsets) to json";
}
@Override
@@ -56,7 +58,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand
{
options.addOption(pathOption);
Option configTypeOption = new Option("t", "configType", true, "Name of
kv config, e.g. " +
- "topics/subscriptionGroups");
+ "topics/subscriptionGroups/consumerOffsets");
configTypeOption.setRequired(true);
options.addOption(configTypeOption);
@@ -71,19 +73,21 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
return;
}
- String configType =
commandLine.getOptionValue("configType").trim().toLowerCase();
+ String configType = commandLine.getOptionValue("configType").trim();
if (!path.endsWith("/")) {
path += "/";
}
path += configType;
-
+ if (CONSUMER_OFFSETS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ printConsumerOffsets(path);
+ return;
+ }
ConfigRocksDBStorage configRocksDBStorage = new
ConfigRocksDBStorage(path, true);
configRocksDBStorage.start();
RocksIterator iterator = configRocksDBStorage.iterator();
-
try {
final Map<String, JSONObject> configMap = new HashMap<>();
- final Map<String, JSONObject> configTable = new HashMap<>();
+ final JSONObject configTable = new JSONObject();
iterator.seekToFirst();
while (iterator.isValid()) {
final byte[] key = iterator.key();
@@ -95,14 +99,16 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
iterator.next();
}
byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
- configMap.put("dataVersion",
- JSONObject.parseObject(new String(kvDataVersion,
DataConverter.CHARSET_UTF8)));
+ if (kvDataVersion != null) {
+ configMap.put("dataVersion",
+ JSONObject.parseObject(new String(kvDataVersion,
DataConverter.CHARSET_UTF8)));
+ }
- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
- configMap.put("topicConfigTable",
JSON.parseObject(JSONObject.toJSONString(configTable)));
+ if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ configMap.put("topicConfigTable", configTable);
}
- if
(SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
- configMap.put("subscriptionGroupTable",
JSON.parseObject(JSONObject.toJSONString(configTable)));
+ if (SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ configMap.put("subscriptionGroupTable", configTable);
}
System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
} catch (Exception e) {
@@ -111,4 +117,42 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
configRocksDBStorage.shutdown();
}
}
+
+ private void printConsumerOffsets(String path) {
+ ConfigRocksDBStorage configRocksDBStorage = new
ConfigRocksDBStorage(path, true);
+ configRocksDBStorage.start();
+ RocksIterator iterator = configRocksDBStorage.iterator();
+ try {
+ final Map<String, JSONObject> configMap = new HashMap<>();
+ final JSONObject configTable = new JSONObject();
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ final byte[] key = iterator.key();
+ final byte[] value = iterator.value();
+ final String name = new String(key,
DataConverter.CHARSET_UTF8);
+ final String config = new String(value,
DataConverter.CHARSET_UTF8);
+ final RocksDBOffsetSerializeWrapper jsonObject =
JSONObject.parseObject(config, RocksDBOffsetSerializeWrapper.class);
+ configTable.put(name, jsonObject.getOffsetTable());
+ iterator.next();
+ }
+ configMap.put("offsetTable", configTable);
+ System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+ } catch (Exception e) {
+ System.out.print("Error occurred while converting RocksDB kv
config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
+ } finally {
+ configRocksDBStorage.shutdown();
+ }
+ }
+
+ static class RocksDBOffsetSerializeWrapper {
+ private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap<>(16);
+
+ public ConcurrentMap<Integer, Long> getOffsetTable() {
+ return offsetTable;
+ }
+
+ public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+ }
}
\ No newline at end of file