This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0b6563b58 Add getControllerConfig and updateControllerConfig command
for admin CLI (#4793)
0b6563b58 is described below
commit 0b6563b58fbd3a16e96cdb1691e2d453fbf9b555
Author: rongtong <[email protected]>
AuthorDate: Sun Aug 7 15:28:51 2022 +0800
Add getControllerConfig and updateControllerConfig command for admin CLI
(#4793)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 73 +++++++++++++++++--
.../client/impl/factory/MQClientInstance.java | 2 +
.../rocketmq/common/protocol/RequestCode.java | 10 +++
.../org/apache/rocketmq/controller/Controller.java | 2 +-
.../rocketmq/controller/ControllerManager.java | 2 +
.../processor/ControllerRequestProcessor.java | 81 +++++++++++++++++++---
.../cluster-3n-independent/controller-n0.conf | 30 ++++----
.../cluster-3n-independent/controller-n1.conf | 30 ++++----
.../cluster-3n-independent/controller-n2.conf | 30 ++++----
.../conf/controller/controller-standalone.conf | 30 ++++----
.../conf/controller/quick-start/broker-n0.conf | 30 ++++----
.../conf/controller/quick-start/broker-n1.conf | 30 ++++----
.../conf/controller/quick-start/namesrv.conf | 30 ++++----
style/rmq_checkstyle.xml | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 14 ++++
.../tools/admin/DefaultMQAdminExtImpl.java | 31 +++++++--
.../apache/rocketmq/tools/admin/MQAdminExt.java | 21 ++++++
.../rocketmq/tools/command/MQAdminStartup.java | 15 ++--
...hCommand.java => GetBrokerEpochSubCommand.java} | 2 +-
...and.java => GetControllerConfigSubCommand.java} | 51 +++++++++-----
...d.java => GetControllerMetaDataSubCommand.java} | 2 +-
....java => UpdateControllerConfigSubCommand.java} | 62 ++++++++++++-----
22 files changed, 400 insertions(+), 180 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index a503d81cd..c425dba29 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -445,7 +445,8 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void updateGlobalWhiteAddrsConfig(final String addr, final String
globalWhiteAddrs, String aclFileFullPath, final long timeoutMillis)
+ public void updateGlobalWhiteAddrsConfig(final String addr, final String
globalWhiteAddrs, String aclFileFullPath,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new
UpdateGlobalWhiteAddrsConfigRequestHeader();
requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs);
@@ -1175,7 +1176,8 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
}
- public long searchOffset(final String addr, final MessageQueue
messageQueue, final long timestamp, final long timeoutMillis)
+ public long searchOffset(final String addr, final MessageQueue
messageQueue, final long timestamp,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new
SearchOffsetRequestHeader();
requestHeader.setTopic(messageQueue.getTopic());
@@ -1597,14 +1599,14 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
}
public ProducerTableInfo getAllProducerInfo(final String addr, final long
timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException,
- MQBrokerException {
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
GetAllProducerInfoRequestHeader requestHeader = new
GetAllProducerInfoRequestHeader();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_PRODUCER_INFO,
requestHeader);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ProducerTableInfo.decode(response.getBody(),
ProducerTableInfo.class);
@@ -2921,7 +2923,8 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public GetMetaDataResponseHeader getControllerMetaData(final String
controllerAddress) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
RemotingCommandException, MQBrokerException {
+ public GetMetaDataResponseHeader getControllerMetaData(
+ final String controllerAddress) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
RemotingCommandException, MQBrokerException {
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_METADATA_INFO,
null);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
@@ -2954,7 +2957,8 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public EpochEntryCache getBrokerEpochCache(String brokerAddr) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
+ public EpochEntryCache getBrokerEpochCache(
+ String brokerAddr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_EPOCH_CACHE, null);
final RemotingCommand response =
this.remotingClient.invokeSync(brokerAddr, request, 3000);
assert response != null;
@@ -2967,4 +2971,59 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+
+ public Map<String, Properties> getControllerConfig(final List<String>
controllerServers,
+ final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQClientException, UnsupportedEncodingException {
+ List<String> invokeControllerServers = (controllerServers == null ||
controllerServers.isEmpty()) ?
+ this.remotingClient.getNameServerAddressList() : controllerServers;
+ if (invokeControllerServers == null ||
invokeControllerServers.isEmpty()) {
+ return null;
+ }
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONTROLLER_CONFIG, null);
+
+ Map<String, Properties> configMap = new HashMap<String, Properties>(4);
+ for (String controller : invokeControllerServers) {
+ RemotingCommand response =
this.remotingClient.invokeSync(controller, request, timeoutMillis);
+
+ assert response != null;
+
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ configMap.put(controller, MixAll.string2Properties(new
String(response.getBody(), MixAll.DEFAULT_CHARSET)));
+ } else {
+ throw new MQClientException(response.getCode(),
response.getRemark());
+ }
+ }
+ return configMap;
+ }
+
+ public void updateControllerConfig(final Properties properties, final
List<String> controllers,
+ final long timeoutMillis) throws InterruptedException,
RemotingConnectException, UnsupportedEncodingException,
+ RemotingSendRequestException, RemotingTimeoutException,
MQClientException {
+ String str = MixAll.properties2String(properties);
+ if (str.length() < 1 || controllers == null || controllers.isEmpty()) {
+ return;
+ }
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONTROLLER_CONFIG,
null);
+ request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
+
+ RemotingCommand errResponse = null;
+ for (String controller : controllers) {
+ RemotingCommand response =
this.remotingClient.invokeSync(controller, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ break;
+ }
+ default:
+ errResponse = response;
+ }
+ }
+
+ if (errResponse != null) {
+ throw new MQClientException(errResponse.getCode(),
errResponse.getRemark());
+ }
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 5fc6f069b..56c7edf60 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1325,4 +1325,6 @@ public class MQClientInstance {
}
return data;
}
+
+
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 4b010a811..9441e4fd0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -259,4 +259,14 @@ public class RequestCode {
public static final int GET_BROKER_EPOCH_CACHE = 1007;
public static final int NOTIFY_BROKER_ROLE_CHANGED = 1008;
+
+ /**
+ * update the config of controller
+ */
+ public static final int UPDATE_CONTROLLER_CONFIG = 1009;
+
+ /**
+ * get config from controller
+ */
+ public static final int GET_CONTROLLER_CONFIG = 1010;
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 51f228725..1cd724307 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -48,7 +48,7 @@ public interface Controller {
void startScheduling();
/**
- * Stop scheduling controller events, this function only will be triggered
when the controller shutdown leaderShip.
+ * Stop scheduling controller events, this function only will be triggered
when the controller lose leadership.
*/
void stopScheduling();
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 22bc5a9cf..4f0e1e75d 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -181,6 +181,8 @@ public class ControllerManager {
controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO,
controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA,
controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.BROKER_HEARTBEAT,
controllerRequestProcessor, this.controllerRequestExecutor);
+
controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG,
controllerRequestProcessor, this.controllerRequestExecutor);
+
controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG,
controllerRequestProcessor, this.controllerRequestExecutor);
}
public void start() {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 1d7f6be90..12824d7c3 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -17,9 +17,12 @@
package org.apache.rocketmq.controller.processor;
import io.netty.channel.ChannelHandlerContext;
+import java.io.UnsupportedEncodingException;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
@@ -30,7 +33,6 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBro
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
-import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -46,6 +48,8 @@ import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_MET
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA;
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+import static
org.apache.rocketmq.common.protocol.RequestCode.GET_CONTROLLER_CONFIG;
+import static
org.apache.rocketmq.common.protocol.RequestCode.UPDATE_CONTROLLER_CONFIG;
/**
* Processor for controller request
@@ -53,11 +57,11 @@ import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTE
public class ControllerRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private static final int WAIT_TIMEOUT_OUT = 5;
- private final Controller controller;
+ private final ControllerManager controllerManager;
private final BrokerHeartbeatManager heartbeatManager;
public ControllerRequestProcessor(final ControllerManager
controllerManager) {
- this.controller = controllerManager.getController();
+ this.controllerManager = controllerManager;
this.heartbeatManager = controllerManager.getHeartbeatManager();
}
@@ -73,7 +77,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
case CONTROLLER_ALTER_SYNC_STATE_SET: {
final AlterSyncStateSetRequestHeader controllerRequest =
(AlterSyncStateSetRequestHeader)
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
final SyncStateSet syncStateSet =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
- final CompletableFuture<RemotingCommand> future =
this.controller.alterSyncStateSet(controllerRequest, syncStateSet);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().alterSyncStateSet(controllerRequest,
syncStateSet);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
@@ -81,7 +85,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
}
case CONTROLLER_ELECT_MASTER: {
final ElectMasterRequestHeader controllerRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controller.electMaster(controllerRequest);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(controllerRequest);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
@@ -89,7 +93,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
}
case CONTROLLER_REGISTER_BROKER: {
final RegisterBrokerToControllerRequestHeader
controllerRequest = (RegisterBrokerToControllerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controller.registerBroker(controllerRequest);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().registerBroker(controllerRequest);
if (future != null) {
final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
final RegisterBrokerToControllerResponseHeader
responseHeader = (RegisterBrokerToControllerResponseHeader)
response.readCustomHeader();
@@ -103,14 +107,14 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
}
case CONTROLLER_GET_REPLICA_INFO: {
final GetReplicaInfoRequestHeader controllerRequest =
(GetReplicaInfoRequestHeader)
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controller.getReplicaInfo(controllerRequest);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getReplicaInfo(controllerRequest);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
break;
}
case CONTROLLER_GET_METADATA_INFO: {
- return this.controller.getControllerMetadata();
+ return
this.controllerManager.getController().getControllerMetadata();
}
case BROKER_HEARTBEAT: {
final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
@@ -121,7 +125,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
if (request.getBody() != null) {
final List<String> brokerNames =
RemotingSerializable.decode(request.getBody(), List.class);
if (brokerNames != null && brokerNames.size() > 0) {
- final CompletableFuture<RemotingCommand> future =
this.controller.getSyncStateData(brokerNames);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getSyncStateData(brokerNames);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT,
TimeUnit.SECONDS);
}
@@ -129,6 +133,10 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
}
break;
}
+ case UPDATE_CONTROLLER_CONFIG:
+ return this.updateControllerConfig(ctx, request);
+ case GET_CONTROLLER_CONFIG:
+ return this.getControllerConfig(ctx, request);
default: {
final String error = " request type " + request.getCode() + "
not supported";
return
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
@@ -141,4 +149,59 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
public boolean rejectRequest() {
return false;
}
+
+ private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ if (ctx != null) {
+ log.info("updateConfig called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ }
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ byte[] body = request.getBody();
+ if (body != null) {
+ String bodyStr;
+ try {
+ bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ log.error("updateConfig byte array to string error: ", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+
+ Properties properties = MixAll.string2Properties(bodyStr);
+ if (properties == null) {
+ log.error("updateConfig MixAll.string2Properties error {}",
bodyStr);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string2Properties error");
+ return response;
+ }
+
+ this.controllerManager.getConfiguration().update(properties);
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getControllerConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ String content =
this.controllerManager.getConfiguration().getAllConfigsFormatString();
+ if (content != null && content.length() > 0) {
+ try {
+ response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ log.error("getConfig error, ", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
}
diff --git
a/distribution/conf/controller/cluster-3n-independent/controller-n0.conf
b/distribution/conf/controller/cluster-3n-independent/controller-n0.conf
index 9031dd753..d57413792 100644
--- a/distribution/conf/controller/cluster-3n-independent/controller-n0.conf
+++ b/distribution/conf/controller/cluster-3n-independent/controller-n0.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9878;n1-127.0.0.1:9868;n2-127.0.0.1:9858
diff --git
a/distribution/conf/controller/cluster-3n-independent/controller-n1.conf
b/distribution/conf/controller/cluster-3n-independent/controller-n1.conf
index 7ac7229ca..f6dec2235 100644
--- a/distribution/conf/controller/cluster-3n-independent/controller-n1.conf
+++ b/distribution/conf/controller/cluster-3n-independent/controller-n1.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9878;n1-127.0.0.1:9868;n2-127.0.0.1:9858
diff --git
a/distribution/conf/controller/cluster-3n-independent/controller-n2.conf
b/distribution/conf/controller/cluster-3n-independent/controller-n2.conf
index 982200f72..aa45fa53c 100644
--- a/distribution/conf/controller/cluster-3n-independent/controller-n2.conf
+++ b/distribution/conf/controller/cluster-3n-independent/controller-n2.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9878;n1-127.0.0.1:9868;n2-127.0.0.1:9858
diff --git a/distribution/conf/controller/controller-standalone.conf
b/distribution/conf/controller/controller-standalone.conf
index c9403b8bc..700908f34 100644
--- a/distribution/conf/controller/controller-standalone.conf
+++ b/distribution/conf/controller/controller-standalone.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9878
diff --git a/distribution/conf/controller/quick-start/broker-n0.conf
b/distribution/conf/controller/quick-start/broker-n0.conf
index 49d9e4c02..c397689ab 100644
--- a/distribution/conf/controller/quick-start/broker-n0.conf
+++ b/distribution/conf/controller/quick-start/broker-n0.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
diff --git a/distribution/conf/controller/quick-start/broker-n1.conf
b/distribution/conf/controller/quick-start/broker-n1.conf
index 5e541367c..33bab6b38 100644
--- a/distribution/conf/controller/quick-start/broker-n1.conf
+++ b/distribution/conf/controller/quick-start/broker-n1.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
diff --git a/distribution/conf/controller/quick-start/namesrv.conf
b/distribution/conf/controller/quick-start/namesrv.conf
index 46b88566f..a7d81b0d8 100644
--- a/distribution/conf/controller/quick-start/namesrv.conf
+++ b/distribution/conf/controller/quick-start/namesrv.conf
@@ -1,19 +1,17 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
enableControllerInNamesrv = true
controllerDLegerGroup = group1
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
index e93b353aa..16078710e 100644
--- a/style/rmq_checkstyle.xml
+++ b/style/rmq_checkstyle.xml
@@ -73,7 +73,7 @@
</module>
<module name="FileLength">
- <property name="max" value="3000"/>
+ <property name="max" value="5000"/>
</module>
<module name="TreeWalker">
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 4a13d69a4..3356f94f7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -787,6 +787,7 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr,
masterFlushOffset);
}
+
public QueryResult queryMessageByUniqKey(String topic, String key, int
maxNum, long begin, long end)
throws MQClientException, InterruptedException {
@@ -805,4 +806,17 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
InterruptedException, MQBrokerException {
return
this.defaultMQAdminExtImpl.updateAndGetGroupReadForbidden(brokerAddr,
groupName, topicName, readable);
}
+
+ @Override
+ public Map<String, Properties> getControllerConfig(List<String>
controllerServers) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQClientException,
+ UnsupportedEncodingException {
+ return
this.defaultMQAdminExtImpl.getControllerConfig(controllerServers);
+ }
+
+ @Override
+ public void updateControllerConfig(Properties properties,
+ List<String> controllers) throws InterruptedException,
RemotingConnectException, UnsupportedEncodingException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
MQBrokerException {
+ this.defaultMQAdminExtImpl.updateControllerConfig(properties,
controllers);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index bb2982901..0efb9e1f7 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -278,7 +278,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
@Override
public void updateGlobalWhiteAddrConfig(String addr,
- String globalWhiteAddrs, String aclFileFullPath) throws
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ String globalWhiteAddrs,
+ String aclFileFullPath) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr,
globalWhiteAddrs, aclFileFullPath, timeoutMillis);
}
@@ -453,7 +454,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
if (currentRoute.getTopicQueueMappingByBroker() == null
|| currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
//normal topic
- for (Map.Entry<MessageQueue, OffsetWrapper> entry:
result.getOffsetTable().entrySet()) {
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry :
result.getOffsetTable().entrySet()) {
if (entry.getKey().getTopic().equals(currentTopic)) {
staticResult.getOffsetTable().put(entry.getKey(),
entry.getValue());
}
@@ -631,7 +632,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public ProducerTableInfo getAllProducerInfo(final String brokerAddr)
throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
+ public ProducerTableInfo getAllProducerInfo(
+ final String brokerAddr) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
return
this.mqClientInstance.getMQClientAPIImpl().getAllProducerInfo(brokerAddr,
timeoutMillis);
}
@@ -1174,7 +1176,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public boolean deleteExpiredCommitLog(String cluster) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
+ public boolean deleteExpiredCommitLog(
+ String cluster) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
@@ -1192,7 +1195,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return result;
}
- public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo,
String cluster) throws RemotingConnectException,
+ public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo,
+ String cluster) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException,
MQClientException, InterruptedException {
boolean result = false;
String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
@@ -1203,7 +1207,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public boolean deleteExpiredCommitLogByAddr(String addr) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
+ public boolean deleteExpiredCommitLogByAddr(
+ String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
boolean result =
mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr,
timeoutMillis);
log.warn("Delete expired CommitLog on target " + addr + " broker " +
result);
return result;
@@ -1810,6 +1815,20 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
}
}
+ @Override
+ public Map<String, Properties> getControllerConfig(
+ List<String> controllerServers) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQClientException,
+ UnsupportedEncodingException {
+ return
this.mqClientInstance.getMQClientAPIImpl().getControllerConfig(controllerServers,
timeoutMillis);
+ }
+
+ @Override public void updateControllerConfig(Properties properties,
+ List<String> controllers) throws InterruptedException,
RemotingConnectException, UnsupportedEncodingException,
+ RemotingSendRequestException, RemotingTimeoutException,
MQClientException, MQBrokerException {
+
this.mqClientInstance.getMQClientAPIImpl().updateControllerConfig(properties,
controllers, timeoutMillis);
+ }
+
public MQClientInstance getMqClientInstance() {
return mqClientInstance;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 8cca43aaa..9fe6ea4fe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -412,4 +412,25 @@ public interface MQAdminExt extends MQAdmin {
*/
void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
throws InterruptedException, MQBrokerException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
+
+
+ /**
+ * Get controller config.
+ * <br>
+ * Command Code : RequestCode.GET_CONTROLLER_CONFIG
+ *
+ * @return The fetched controller config
+ */
+ Map<String, Properties> getControllerConfig(List<String>
controllerServers) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQClientException, UnsupportedEncodingException;
+
+ /**
+ * Update controller config.
+ * <br>
+ * Command Code : RequestCode.UPDATE_CONTROLLER_CONFIG
+ */
+ void updateControllerConfig(final Properties properties,
+ final List<String> controllers) throws InterruptedException,
RemotingConnectException,
+ UnsupportedEncodingException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, MQBrokerException;
+
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index eb0820249..fc9226d8a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -43,7 +43,7 @@ import
org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
import
org.apache.rocketmq.tools.command.broker.DeleteExpiredCommitLogSubCommand;
import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
-import org.apache.rocketmq.tools.command.broker.GetBrokerEpochCommand;
+import org.apache.rocketmq.tools.command.broker.GetBrokerEpochSubCommand;
import
org.apache.rocketmq.tools.command.broker.ResetMasterFlushOffsetSubCommand;
import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
@@ -60,7 +60,9 @@ import
org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
-import
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataCommand;
+import
org.apache.rocketmq.tools.command.controller.GetControllerConfigSubCommand;
+import
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataSubCommand;
+import
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
@@ -257,8 +259,11 @@ public class MQAdminStartup {
initCommand(new HAStatusSubCommand());
initCommand(new GetSyncStateSetSubCommand());
- initCommand(new GetBrokerEpochCommand());
- initCommand(new GetControllerMetaDataCommand());
+ initCommand(new GetBrokerEpochSubCommand());
+ initCommand(new GetControllerMetaDataSubCommand());
+
+ initCommand(new GetControllerConfigSubCommand());
+ initCommand(new UpdateControllerConfigSubCommand());
}
private static void initLogback() throws JoranException {
@@ -278,7 +283,7 @@ public class MQAdminStartup {
System.out.printf("The most commonly used mqadmin commands are:%n");
for (SubCommand cmd : subCommandList) {
- System.out.printf(" %-20s %s%n", cmd.commandName(),
cmd.commandDesc());
+ System.out.printf(" %-25s %s%n", cmd.commandName(),
cmd.commandDesc());
}
System.out.printf("%nSee 'mqadmin help <command>' for more information
on a specific command.%n");
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
similarity index 98%
rename from
tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCommand.java
rename to
tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
index ac1c44ff7..8a84e662e 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-public class GetBrokerEpochCommand implements SubCommand {
+public class GetBrokerEpochSubCommand implements SubCommand {
@Override
public String commandName() {
return "getBrokerEpoch";
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerConfigSubCommand.java
similarity index 57%
copy from
tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
copy to
tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerConfigSubCommand.java
index 27d764661..ca3aa0daf 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerConfigSubCommand.java
@@ -14,53 +14,68 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+
package org.apache.rocketmq.tools.command.controller;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-public class GetControllerMetaDataCommand implements SubCommand {
+public class GetControllerConfigSubCommand implements SubCommand {
@Override
public String commandName() {
- return "getControllerMetaData";
+ return "getControllerConfig";
}
@Override
public String commandDesc() {
- return "get controller cluster's metadata";
+ return "Get controller config.";
}
@Override
- public Options buildCommandlineOptions(Options options) {
- Option opt = new Option("a", "controllerAddress", true, "the address
of controller");
+ public Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("a", "controllerAddress", true, "Controller
address list, eg: 192.168.0.1:9878;192.168.0.2:9878");
opt.setRequired(true);
options.addOption(opt);
+
return options;
}
@Override
- public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- String controllerAddress = commandLine.getOptionValue('a').trim();
try {
+ // servers
+ String servers = commandLine.getOptionValue('a');
+ List<String> serverList = null;
+ if (servers != null && servers.length() > 0) {
+ String[] serverArray = servers.trim().split(";");
+
+ if (serverArray.length > 0) {
+ serverList = Arrays.asList(serverArray);
+ }
+ }
+
defaultMQAdminExt.start();
- final GetMetaDataResponseHeader metaData =
defaultMQAdminExt.getControllerMetaData(controllerAddress);
- System.out.printf("\n#ControllerGroup\t%s", metaData.getGroup());
- System.out.printf("\n#ControllerLeaderId\t%s",
metaData.getControllerLeaderId());
- System.out.printf("\n#ControllerLeaderAddress\t%s",
metaData.getControllerLeaderAddress());
- final String peers = metaData.getPeers();
- if (StringUtils.isNotEmpty(peers)) {
- final String[] peerList = peers.split(";");
- for (String peer : peerList) {
- System.out.printf("\n#Peer:\t%s", peer);
+
+ Map<String, Properties> controllerConfigs =
defaultMQAdminExt.getControllerConfig(serverList);
+
+ for (Map.Entry<String, Properties> controllerConfigEntry :
controllerConfigs.entrySet()) {
+ System.out.printf("============%s============\n",
+ controllerConfigEntry.getKey());
+ for (Map.Entry<Object, Object> entry :
controllerConfigEntry.getValue().entrySet()) {
+ System.out.printf("%-50s= %s\n", entry.getKey(),
entry.getValue());
}
}
} catch (Exception e) {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
similarity index 97%
copy from
tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
copy to
tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
index 27d764661..e367943f3 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-public class GetControllerMetaDataCommand implements SubCommand {
+public class GetControllerMetaDataSubCommand implements SubCommand {
@Override
public String commandName() {
return "getControllerMetaData";
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/UpdateControllerConfigSubCommand.java
similarity index 54%
rename from
tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
rename to
tools/src/main/java/org/apache/rocketmq/tools/command/controller/UpdateControllerConfigSubCommand.java
index 27d764661..90e1f5ec2 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/UpdateControllerConfigSubCommand.java
@@ -14,55 +14,79 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+
package org.apache.rocketmq.tools.command.controller;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-public class GetControllerMetaDataCommand implements SubCommand {
+public class UpdateControllerConfigSubCommand implements SubCommand {
@Override
public String commandName() {
- return "getControllerMetaData";
+ return "updateControllerConfig";
}
@Override
public String commandDesc() {
- return "get controller cluster's metadata";
+ return "Update controller config.";
}
@Override
- public Options buildCommandlineOptions(Options options) {
- Option opt = new Option("a", "controllerAddress", true, "the address
of controller");
+ public Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("a", "controllerAddress", true, "Controller
address list, eg: 192.168.0.1:9878;192.168.0.2:9878");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("k", "key", true, "config key");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("v", "value", true, "config value");
opt.setRequired(true);
options.addOption(opt);
+
return options;
}
@Override
- public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- String controllerAddress = commandLine.getOptionValue('a').trim();
try {
- defaultMQAdminExt.start();
- final GetMetaDataResponseHeader metaData =
defaultMQAdminExt.getControllerMetaData(controllerAddress);
- System.out.printf("\n#ControllerGroup\t%s", metaData.getGroup());
- System.out.printf("\n#ControllerLeaderId\t%s",
metaData.getControllerLeaderId());
- System.out.printf("\n#ControllerLeaderAddress\t%s",
metaData.getControllerLeaderAddress());
- final String peers = metaData.getPeers();
- if (StringUtils.isNotEmpty(peers)) {
- final String[] peerList = peers.split(";");
- for (String peer : peerList) {
- System.out.printf("\n#Peer:\t%s", peer);
+ // key name
+ String key = commandLine.getOptionValue('k').trim();
+ // key name
+ String value = commandLine.getOptionValue('v').trim();
+ Properties properties = new Properties();
+ properties.put(key, value);
+
+ // servers
+ String servers = commandLine.getOptionValue('a');
+ List<String> serverList = null;
+ if (servers != null && servers.length() > 0) {
+ String[] serverArray = servers.trim().split(";");
+
+ if (serverArray.length > 0) {
+ serverList = Arrays.asList(serverArray);
}
}
+
+ defaultMQAdminExt.start();
+
+ defaultMQAdminExt.updateControllerConfig(properties, serverList);
+
+ System.out.printf("update controller config success!%s\n%s : %s\n",
+ serverList == null ? "" : serverList, key, value);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
} finally {