This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b30afe8184 [ISSUE #9111] Support export broker RocksDB Config to json
file (#9114)
b30afe8184 is described below
commit b30afe81840a0aacac730556432ebcdb276bbe85
Author: qianye <[email protected]>
AuthorDate: Tue Jan 14 10:29:40 2025 +0800
[ISSUE #9111] Support export broker RocksDB Config to json file (#9114)
* [ISSUE #9111] Broker Support export RocksDB Config to json file and
enhance admin tools
---
.../config/v1/RocksDBConsumerOffsetManager.java | 13 +-
.../config/v1/RocksDBSubscriptionGroupManager.java | 7 +-
.../config/v1/RocksDBTopicConfigManager.java | 7 +-
.../broker/processor/AdminBrokerProcessor.java | 54 ++++-
.../rocketmq/client/impl/MQClientAPIImpl.java | 16 ++
.../rocketmq/remoting/protocol/RequestCode.java | 1 +
.../ExportRocksDBConfigToJsonRequestHeader.java | 100 +++++++++
...ExportRocksDBConfigToJsonRequestHeaderTest.java | 51 +++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 8 +
.../tools/admin/DefaultMQAdminExtImpl.java | 8 +
.../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +
.../export/ExportMetadataInRocksDBCommand.java | 11 +-
.../metadata/RocksDBConfigToJsonCommand.java | 224 ++++++++++++++++++---
13 files changed, 463 insertions(+), 42 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 824fc0fee3..963c5046f2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -38,7 +38,7 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- protected RocksDBConfigManager rocksDBConfigManager;
+ protected transient RocksDBConfigManager rocksDBConfigManager;
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
@@ -100,7 +100,7 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
byte[] keyBytes =
topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
- LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
+ log.error("kv remove consumerOffset Failed, {}", topicAtGroup);
}
}
@@ -109,7 +109,7 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body,
RocksDBOffsetSerializeWrapper.class);
this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
- LOG.info("load exist local offset, {}, {}", topicAtGroup,
wrapper.getOffsetTable());
+ log.info("load exist local offset, {}, {}", topicAtGroup,
wrapper.getOffsetTable());
}
public String rocksdbConfigFilePath() {
@@ -132,12 +132,17 @@ public class RocksDBConsumerOffsetManager extends
ConsumerOffsetManager {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
- LOG.error("consumer offset persist Failed", e);
+ log.error("consumer offset persist Failed", e);
} finally {
writeBatch.close();
}
}
+ public synchronized void exportToJson() {
+ log.info("RocksDBConsumerOffsetManager export consumer offset to json
file");
+ super.persist();
+ }
+
private void putWriteBatch(final WriteBatch writeBatch, final String
topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = new
RocksDBOffsetSerializeWrapper();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 8fc7a4d6ed..ff47152569 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -37,7 +37,7 @@ import org.rocksdb.RocksIterator;
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
- protected RocksDBConfigManager rocksDBConfigManager;
+ protected transient RocksDBConfigManager rocksDBConfigManager;
public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
@@ -184,6 +184,11 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
}
}
+ public synchronized void exportToJson() {
+ log.info("RocksDBSubscriptionGroupManager export subscription group to
json file");
+ super.persist();
+ }
+
public String rocksdbConfigFilePath() {
return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "subscriptionGroups" +
File.separator;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index 18e633d348..d64f808067 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -32,7 +32,7 @@ import org.rocksdb.CompressionType;
public class RocksDBTopicConfigManager extends TopicConfigManager {
- protected RocksDBConfigManager rocksDBConfigManager;
+ protected transient RocksDBConfigManager rocksDBConfigManager;
public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
@@ -139,6 +139,11 @@ public class RocksDBTopicConfigManager extends
TopicConfigManager {
}
}
+ public synchronized void exportToJson() {
+ log.info("RocksDBTopicConfigManager export topic config to json file");
+ super.persist();
+ }
+
public String rocksdbConfigFilePath() {
return
this.brokerController.getMessageStoreConfig().getStorePathRootDir() +
File.separator + "config" + File.separator + "topics" + File.separator;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6fb7584aa9..a9b913192f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -60,6 +61,9 @@ import org.apache.rocketmq.broker.auth.converter.AclConverter;
import org.apache.rocketmq.broker.auth.converter.UserConverter;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
@@ -159,6 +163,7 @@ import
org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
@@ -239,7 +244,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final BrokerController brokerController;
protected Set<String> configBlackList = new HashSet<>();
- private final ExecutorService asyncExecuteWorker = new
ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+ private final ExecutorService asyncExecuteWorker = new
ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
@@ -356,6 +361,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return queryConsumeQueue(ctx, request);
case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
return this.checkRocksdbCqWriteProgress(ctx, request);
+ case RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON:
+ return this.exportRocksDBConfigToJson(ctx, request);
case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
return this.updateAndGetGroupForbidden(ctx, request);
case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
@@ -495,6 +502,51 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
+ private RemotingCommand exportRocksDBConfigToJson(ChannelHandlerContext
ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ ExportRocksDBConfigToJsonRequestHeader requestHeader =
request.decodeCommandCustomHeader(ExportRocksDBConfigToJsonRequestHeader.class);
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configTypes =
requestHeader.fetchConfigType();
+ List<CompletableFuture<Void>> futureList = new
ArrayList<>(configTypes.size());
+ for (ExportRocksDBConfigToJsonRequestHeader.ConfigType type :
configTypes) {
+ switch (type) {
+ case TOPICS:
+ if (this.brokerController.getTopicConfigManager()
instanceof RocksDBTopicConfigManager) {
+ RocksDBTopicConfigManager rocksDBTopicConfigManager =
(RocksDBTopicConfigManager) this.brokerController.getTopicConfigManager();
+
futureList.add(CompletableFuture.runAsync(rocksDBTopicConfigManager::exportToJson,
asyncExecuteWorker));
+ }
+ break;
+ case SUBSCRIPTION_GROUPS:
+ if (this.brokerController.getSubscriptionGroupManager()
instanceof RocksDBSubscriptionGroupManager) {
+ RocksDBSubscriptionGroupManager
rocksDBSubscriptionGroupManager = (RocksDBSubscriptionGroupManager)
this.brokerController.getSubscriptionGroupManager();
+
futureList.add(CompletableFuture.runAsync(rocksDBSubscriptionGroupManager::exportToJson,
asyncExecuteWorker));
+ }
+ break;
+ case CONSUMER_OFFSETS:
+ if (this.brokerController.getConsumerOffsetManager()
instanceof RocksDBConsumerOffsetManager) {
+ RocksDBConsumerOffsetManager
rocksDBConsumerOffsetManager = (RocksDBConsumerOffsetManager)
this.brokerController.getConsumerOffsetManager();
+
futureList.add(CompletableFuture.runAsync(rocksDBConsumerOffsetManager::exportToJson,
asyncExecuteWorker));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ try {
+ CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).join();
+ } catch (CompletionException e) {
+ RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.valueOf(e));
+ return response;
+ }
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("export done.");
+ return response;
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7d4b51cfc5..114093e350 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -164,6 +164,7 @@ import
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupReque
import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
@@ -3036,6 +3037,21 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
throw new MQClientException(response.getCode(), response.getRemark());
}
+ public void exportRocksDBConfigToJson(final String brokerAddr,
+ final List<ExportRocksDBConfigToJsonRequestHeader.ConfigType>
configType,
+ final long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
+ ExportRocksDBConfigToJsonRequestHeader header = new
ExportRocksDBConfigToJsonRequestHeader();
+ header.updateConfigType(configType);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON,
header);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
+ assert response != null;
+
+ if (ResponseCode.SUCCESS != response.getCode()) {
+ throw new MQClientException(response.getCode(),
response.getRemark());
+ }
+ }
+
public void checkClientInBroker(final String brokerAddr, final String
consumerGroup,
final String clientId, final SubscriptionData subscriptionData,
final long timeoutMillis)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 623f5748d5..e3b180a537 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -219,6 +219,7 @@ public class RequestCode {
public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352;
public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
+ public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355;
public static final int LITE_PULL_MESSAGE = 361;
public static final int RECALL_MESSAGE = 370;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
new file mode 100644
index 0000000000..7b1f9470e1
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action =
Action.GET)
+public class ExportRocksDBConfigToJsonRequestHeader implements
CommandCustomHeader {
+ private static final String CONFIG_TYPE_SEPARATOR = ";";
+
+ public enum ConfigType {
+ TOPICS("topics"),
+ SUBSCRIPTION_GROUPS("subscriptionGroups"),
+ CONSUMER_OFFSETS("consumerOffsets");
+
+ private final String typeName;
+
+ ConfigType(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public static ConfigType getConfigTypeByName(String typeName) {
+ for (ConfigType configType : ConfigType.values()) {
+ if
(configType.getTypeName().equalsIgnoreCase(typeName.trim())) {
+ return configType;
+ }
+ }
+ throw new IllegalArgumentException("Unknown config type: " +
typeName);
+ }
+
+ public static List<ConfigType> fromString(String ordinal) {
+ String[] configTypeNames = StringUtils.split(ordinal,
CONFIG_TYPE_SEPARATOR);
+ List<ConfigType> configTypes = new ArrayList<>();
+ for (String configTypeName : configTypeNames) {
+ if (StringUtils.isNotEmpty(configTypeName)) {
+ configTypes.add(getConfigTypeByName(configTypeName));
+ }
+ }
+ return configTypes;
+ }
+
+ public static String toString(List<ConfigType> configTypes) {
+ StringBuilder sb = new StringBuilder();
+ for (ConfigType configType : configTypes) {
+
sb.append(configType.getTypeName()).append(CONFIG_TYPE_SEPARATOR);
+ }
+ return sb.toString();
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ @CFNotNull
+ private String configType;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public List<ConfigType> fetchConfigType() {
+ return ConfigType.fromString(configType);
+ }
+
+ public void updateConfigType(List<ConfigType> configType) {
+ this.configType = ConfigType.toString(configType);
+ }
+
+ public String getConfigType() {
+ return configType;
+ }
+
+ public void setConfigType(String configType) {
+ this.configType = configType;
+ }
+}
\ No newline at end of file
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
new file mode 100644
index 0000000000..bbe625a42a
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportRocksDBConfigToJsonRequestHeaderTest {
+ @Test
+ public void configTypeTest() {
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configTypes =
new ArrayList<>();
+
configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+
configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS);
+
+ String string =
ExportRocksDBConfigToJsonRequestHeader.ConfigType.toString(configTypes);
+
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> newConfigTypes
= ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(string);
+ assert newConfigTypes.size() == 2;
+ assert configTypes.equals(newConfigTypes);
+
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> topics =
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics");
+ assert topics.size() == 1;
+ assert
topics.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> mix =
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("toPics;
subScriptiongroups");
+ assert mix.size() == 2;
+ assert
mix.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+ assert
mix.get(1).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS);
+
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics;
subscription");
+ });
+
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 4b97e14866..f224f749cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -65,6 +65,7 @@ import
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -778,6 +779,13 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic,
checkStoreTime);
}
+ @Override
+ public void exportRocksDBConfigToJson(String brokerAddr,
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
+ this.defaultMQAdminExtImpl.exportRocksDBConfigToJson(brokerAddr,
configType);
+ }
+
@Override
public boolean resumeCheckHalfMessage(String topic,
String msgId)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2523013af0..5be99606dc 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -103,6 +103,7 @@ import
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -1824,6 +1825,13 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
return
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
topic, checkStoreTime, timeoutMillis);
}
+ @Override
+ public void exportRocksDBConfigToJson(String brokerAddr,
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException {
+
this.mqClientInstance.getMQClientAPIImpl().exportRocksDBConfigToJson(brokerAddr,
configType, timeoutMillis);
+ }
+
@Override
public boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 69a0821864..2f01b6cba8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -61,6 +61,7 @@ import
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -392,6 +393,10 @@ public interface MQAdminExt extends MQAdmin {
final long index, final int count, final String consumerGroup)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException;
+ void exportRocksDBConfigToJson(String brokerAddr,
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException;
+
boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
index d5726985e3..438d17d668 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -18,6 +18,10 @@
package org.apache.rocketmq.tools.command.export;
import com.alibaba.fastjson.JSONObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -30,11 +34,6 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.rocksdb.RocksIterator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
public class ExportMetadataInRocksDBCommand implements SubCommand {
private static final String TOPICS_JSON_CONFIG = "topics";
private static final String SUBSCRIPTION_GROUP_JSON_CONFIG =
"subscriptionGroups";
@@ -46,7 +45,7 @@ public class ExportMetadataInRocksDBCommand implements
SubCommand {
@Override
public String commandDesc() {
- return "export RocksDB kv config (topics/subscriptionGroups)";
+ return "export RocksDB kv config (topics/subscriptionGroups).
Recommend to use [mqadmin rocksDBConfigToJson]";
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index f2803b0cbb..48bc163678 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -18,27 +18,38 @@
package org.apache.rocketmq.tools.command.metadata;
import com.alibaba.fastjson.JSONObject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.rocksdb.RocksIterator;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
public class RocksDBConfigToJsonCommand implements SubCommand {
- private static final String TOPICS_JSON_CONFIG = "topics";
- private static final String SUBSCRIPTION_GROUP_JSON_CONFIG =
"subscriptionGroups";
- private static final String CONSUMER_OFFSETS_JSON_CONFIG =
"consumerOffsets";
@Override
public String commandName() {
@@ -47,41 +58,140 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
@Override
public String commandDesc() {
- return "Convert RocksDB kv config
(topics/subscriptionGroups/consumerOffsets) to json";
+ return "Convert RocksDB kv config
(topics/subscriptionGroups/consumerOffsets) to json. " +
+ "[rpc mode] Use [-n, -c, -b, -t] to send Request to broker (
version >= 5.3.2 ) or [local mode] use [-p, -t, -j, -e] to load RocksDB. " +
+ "If -e is provided, tools will export json file instead of std
print";
}
@Override
public Options buildCommandlineOptions(Options options) {
+ Option configTypeOption = new Option("t", "configType", true, "Name of
kv config, e.g. " +
+ "topics/subscriptionGroups/consumerOffsets. Required in local mode
and default all in rpc mode.");
+ options.addOption(configTypeOption);
+
+ // [local mode] options
Option pathOption = new Option("p", "configPath", true,
- "Absolute path to the metadata config directory");
- pathOption.setRequired(true);
+ "[local mode] Absolute path to the metadata config directory");
options.addOption(pathOption);
- Option configTypeOption = new Option("t", "configType", true, "Name of
kv config, e.g. " +
- "topics/subscriptionGroups/consumerOffsets");
- configTypeOption.setRequired(true);
- options.addOption(configTypeOption);
+ Option exportPathOption = new Option("e", "exportFile", true,
+ "[local mode] Absolute file path for exporting, auto backup
existing file, not directory. If exportFile is provided, will export Json file
and ignore [-j].");
+ options.addOption(exportPathOption);
+
+ Option jsonEnableOption = new Option("j", "jsonEnable", true,
+ "[local mode] Json format enable, Default: true. If exportFile is
provided, will export Json file and ignore [-j].");
+ options.addOption(jsonEnableOption);
+
+ // [rpc mode] options
+ Option nameserverOption = new Option("n", "nameserverAddr", true,
+ "[rpc mode] nameserverAddr. If nameserverAddr and clusterName are
provided, will ignore [-p, -e, -j, -b] args");
+ options.addOption(nameserverOption);
+
+ Option clusterOption = new Option("c", "cluster", true,
+ "[rpc mode] Cluster name. If nameserverAddr and clusterName are
provided, will ignore [-p, -e, -j, -b] args");
+ options.addOption(clusterOption);
+
+ Option brokerAddrOption = new Option("b", "brokerAddr", true,
+ "[rpc mode] Broker address. If brokerAddr is provided, will ignore
[-p, -e, -j] args");
+ options.addOption(brokerAddrOption);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> typeList =
getConfigTypeList(commandLine);
+
+ if (commandLine.hasOption("nameserverAddr")) {
+ // [rpc mode] call all brokers in cluster to export to json file
+ System.out.print("Use [rpc mode] call all brokers in cluster to
export to json file \n");
+ checkRequiredArgsProvided(commandLine, "rpc mode", "cluster");
+ handleRpcMode(commandLine, rpcHook, typeList);
+ } else if (commandLine.hasOption("brokerAddr")) {
+ // [rpc mode] call broker to export to json file
+ System.out.print("Use [rpc mode] call broker to export to json
file \n");
+ handleRpcMode(commandLine, rpcHook, typeList);
+ } else if (commandLine.hasOption("configPath")) {
+ // [local mode] load rocksdb to print or export file
+ System.out.print("Use [local mode] load rocksdb to print or export
file \n");
+ checkRequiredArgsProvided(commandLine, "local mode", "configType");
+ handleLocalMode(commandLine);
+ } else {
+ System.out.print(commandDesc() + "\n");
+ }
+ }
+
+ private void handleLocalMode(CommandLine commandLine) {
+ ExportRocksDBConfigToJsonRequestHeader.ConfigType type =
Objects.requireNonNull(getConfigTypeList(commandLine)).get(0);
String path = commandLine.getOptionValue("configPath").trim();
if (StringUtils.isEmpty(path) || !new File(path).exists()) {
System.out.print("Rocksdb path is invalid.\n");
return;
}
+ path = Paths.get(path, type.toString()).toString();
+ String exportFile = commandLine.hasOption("exportFile") ?
commandLine.getOptionValue("exportFile").trim() : null;
+ Map<String, JSONObject> configMap = getConfigMapFromRocksDB(path,
type);
+ if (configMap != null) {
+ if (exportFile == null) {
+ if (commandLine.hasOption("jsonEnable") &&
"false".equalsIgnoreCase(commandLine.getOptionValue("jsonEnable").trim())) {
+ printConfigMapJsonDisable(configMap);
+ } else {
+ System.out.print(JSONObject.toJSONString(configMap, true)
+ "\n");
+ }
+ } else {
+ String jsonString = JSONObject.toJSONString(configMap, true);
+ try {
+ MixAll.string2File(jsonString, exportFile);
+ } catch (IOException e) {
+ System.out.print("persist file " + exportFile + "
exception" + e);
+ }
+ }
+ }
+ }
- String configType = commandLine.getOptionValue("configType").trim();
- if (!path.endsWith("/")) {
- path += "/";
+ private void checkRequiredArgsProvided(CommandLine commandLine, String
mode,
+ String... args) throws SubCommandException {
+ for (String arg : args) {
+ if (!commandLine.hasOption(arg)) {
+ System.out.printf("%s Invalid args, please input %s\n", mode,
String.join(",", args));
+ throw new SubCommandException("Invalid args");
+ }
}
- path += configType;
- if (CONSUMER_OFFSETS_JSON_CONFIG.equalsIgnoreCase(configType)) {
- printConsumerOffsets(path);
- return;
+ }
+
+ private List<ExportRocksDBConfigToJsonRequestHeader.ConfigType>
getConfigTypeList(CommandLine commandLine) {
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> typeList = new
ArrayList<>();
+ if (commandLine.hasOption("configType")) {
+ String configType =
commandLine.getOptionValue("configType").trim();
+ try {
+
typeList.addAll(ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(configType));
+ } catch (IllegalArgumentException e) {
+ System.out.print("Invalid configType: " + configType + "
please input topics/subscriptionGroups/consumerOffsets \n");
+ return null;
+ }
+ } else {
+
typeList.addAll(Arrays.asList(ExportRocksDBConfigToJsonRequestHeader.ConfigType.values()));
}
+ return typeList;
+ }
+
+ private static void printConfigMapJsonDisable(Map<String, JSONObject>
configMap) {
+ AtomicLong count = new AtomicLong(0);
+ for (Map.Entry<String, JSONObject> entry : configMap.entrySet()) {
+ String configKey = entry.getKey();
+ System.out.printf("type: %s", configKey);
+ JSONObject jsonObject = entry.getValue();
+ jsonObject.forEach((k, v) -> System.out.printf("%d, Key: %s,
Value: %s%n", count.incrementAndGet(), k, v));
+ }
+ }
+
+ private static Map<String, JSONObject> getConfigMapFromRocksDB(String path,
+ ExportRocksDBConfigToJsonRequestHeader.ConfigType configType) {
+
+ if
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.CONSUMER_OFFSETS.equals(configType))
{
+ return loadConsumerOffsets(path);
+ }
+
ConfigRocksDBStorage configRocksDBStorage = new
ConfigRocksDBStorage(path, true);
configRocksDBStorage.start();
RocksIterator iterator = configRocksDBStorage.iterator();
@@ -101,24 +211,79 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
if (kvDataVersion != null) {
configMap.put("dataVersion",
- JSONObject.parseObject(new String(kvDataVersion,
DataConverter.CHARSET_UTF8)));
+ JSONObject.parseObject(new String(kvDataVersion,
DataConverter.CHARSET_UTF8)));
}
- if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ if
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS.equals(configType)) {
configMap.put("topicConfigTable", configTable);
}
- if (SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ if
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS.equals(configType))
{
configMap.put("subscriptionGroupTable", configTable);
}
- System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+ return configMap;
} catch (Exception e) {
System.out.print("Error occurred while converting RocksDB kv
config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
} finally {
configRocksDBStorage.shutdown();
}
+ return null;
+ }
+
+ private void handleRpcMode(CommandLine commandLine, RPCHook rpcHook,
+ List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> type) {
+ String nameserverAddr = commandLine.hasOption('n') ?
commandLine.getOptionValue("nameserverAddr").trim() : null;
+ String inputBrokerAddr = commandLine.hasOption('b') ?
commandLine.getOptionValue('b').trim() : null;
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,
30 * 1000);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ defaultMQAdminExt.setNamesrvAddr(nameserverAddr);
+
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+
+ try {
+ defaultMQAdminExt.start();
+ if (clusterName != null) {
+ ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
+ Map<String, Set<String>> clusterAddrTable =
clusterInfo.getClusterAddrTable();
+ Map<String, BrokerData> brokerAddrTable =
clusterInfo.getBrokerAddrTable();
+ if (clusterAddrTable.get(clusterName) == null) {
+ System.out.print("clusterAddrTable is empty");
+ return;
+ }
+ for (Map.Entry<String, BrokerData> entry :
brokerAddrTable.entrySet()) {
+ String brokerName = entry.getKey();
+ BrokerData brokerData = entry.getValue();
+ String brokerAddr = brokerData.getBrokerAddrs().get(0L);
+ futureList.add(sendRequest(type, defaultMQAdminExt,
brokerAddr, brokerName));
+ }
+ } else if (inputBrokerAddr != null) {
+ futureList.add(sendRequest(type, defaultMQAdminExt,
inputBrokerAddr, null));
+ }
+ CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).whenComplete(
+ (v, t) -> System.out.print("broker export done.")
+ ).join();
+ } catch (Exception e) {
+ throw new RuntimeException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ private CompletableFuture<Void>
sendRequest(List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> type,
+ DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, String
brokerName) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ defaultMQAdminExt.exportRocksDBConfigToJson(brokerAddr, type);
+ } catch (Throwable t) {
+ System.out.print((brokerName != null) ? brokerName :
brokerAddr + " export error");
+ throw new CompletionException(this.getClass().getSimpleName()
+ " command failed", t);
+ }
+ return null;
+ });
}
- private void printConsumerOffsets(String path) {
+ private static Map<String, JSONObject> loadConsumerOffsets(String path) {
ConfigRocksDBStorage configRocksDBStorage = new
ConfigRocksDBStorage(path, true);
configRocksDBStorage.start();
RocksIterator iterator = configRocksDBStorage.iterator();
@@ -136,12 +301,13 @@ public class RocksDBConfigToJsonCommand implements
SubCommand {
iterator.next();
}
configMap.put("offsetTable", configTable);
- System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+ return configMap;
} catch (Exception e) {
System.out.print("Error occurred while converting RocksDB kv
config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
} finally {
configRocksDBStorage.shutdown();
}
+ return null;
}
static class RocksDBOffsetSerializeWrapper {