This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new d18d787 Add the check logic for admin process update-and-create
static topic
d18d787 is described below
commit d18d787f92eb44020a482205e27bae4b9c725957
Author: dongeforever <[email protected]>
AuthorDate: Fri Nov 19 20:33:22 2021 +0800
Add the check logic for admin process update-and-create static topic
---
.../broker/processor/AdminBrokerProcessor.java | 19 +++++++---
.../broker/topic/TopicQueueMappingManager.java | 44 +++++++++++++++++++++-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 4 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 4 +-
.../protocol/header/CreateTopicRequestHeader.java | 12 ++++++
.../common/statictopic/TopicQueueMappingUtils.java | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 4 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 4 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 2 +-
.../topic/RemappingStaticTopicSubCommand.java | 12 +++---
.../command/topic/UpdateStaticTopicSubCommand.java | 2 +-
11 files changed, 85 insertions(+), 24 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2f0aafb..bab484d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
+ boolean force = false;
+ if (requestHeader.getForce() != null && requestHeader.getForce()) {
+ force = true;
+ }
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ?
0 : requestHeader.getTopicSysFlag());
-
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
-
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
+ try {
+
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig,
this.brokerController.getTopicConfigManager().getDataVersion());
+
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(),
force);
- response.setCode(ResponseCode.SUCCESS);
+ this.brokerController.registerIncrementBrokerData(topicConfig,
this.brokerController.getTopicConfigManager().getDataVersion());
+ response.setCode(ResponseCode.SUCCESS);
+ } catch (Exception e) {
+ log.error("Update static failed for [{}]", request, e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 13f0857..e2b46fe 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import
org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController;
}
- public void updateTopicQueueMapping(TopicQueueMappingDetail
topicQueueMappingDetail) {
- topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(),
topicQueueMappingDetail);
+ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail,
boolean force) {
+ lock.lock();
+ try {
+ if (newDetail == null) {
+ return;
+ }
+ newDetail.getHostedQueues().forEach((queueId, items) -> {
+ TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
+ });
+
+ TopicQueueMappingDetail oldDetail =
topicQueueMappingTable.get(newDetail.getTopic());
+ if (oldDetail == null) {
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ return;
+ }
+ if (force) {
+ oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+ newDetail.getHostedQueues().putIfAbsent(queueId, items);
+ });
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ return;
+ }
+ //do more check
+ if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
+ throw new RuntimeException(String.format("Can't accept data
with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
+ }
+ for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
+ ImmutableList<LogicQueueMappingItem> oldItems =
oldDetail.getHostedQueues().get(globalId);
+ ImmutableList<LogicQueueMappingItem> newItems =
newDetail.getHostedQueues().get(globalId);
+ if (newItems == null) {
+ //keep the old
+ newDetail.getHostedQueues().put(globalId, oldItems);
+ } else {
+
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems,
newItems);
+ }
+ }
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ } finally {
+ lock.unlock();
+ }
+
}
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 4df7b9e..7d539cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -80,11 +80,11 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
- public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
mappingDetail) throws MQClientException {
+ public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
mappingDetail, boolean force) throws MQClientException {
MQClientException exception = null;
for (int i = 0; i < 3; i++) {
try {
-
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic,
topicConfig, mappingDetail, timeoutMillis);
+
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic,
topicConfig, mappingDetail, force, timeoutMillis);
break;
} catch (Exception e) {
if (2 == i) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d93f690..050fc30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
topicQueueMappingDetail,
+ public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
topicQueueMappingDetail, boolean force,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
CreateTopicRequestHeader requestHeader = new
CreateTopicRequestHeader();
@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
-
+ requestHeader.setForce(force);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC,
requestHeader);
request.setBody(topicQueueMappingDetail.encode());
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 8894d0b..290ec4c 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateTopicRequestHeader implements CommandCustomHeader {
@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements
CommandCustomHeader {
@CFNotNull
private Boolean order = false;
+ @CFNullable
+ private Boolean force = false;
+
@Override
public void checkFields() throws RemotingCommandException {
try {
@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements
CommandCustomHeader {
public void setOrder(Boolean order) {
this.order = order;
}
+
+ public Boolean getForce() {
+ return force;
+ }
+
+ public void setForce(Boolean force) {
+ this.force = force;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 370f0ec..eb82cad 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,7 +162,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
- public static void
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems,
ImmutableList<LogicQueueMappingItem> newItems) {
+ public static void
makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem>
oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6b0f5a6..7030056 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public void createStaticTopic(String addr, String defaultTopic,
TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws
RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic,
topicConfig, mappingDetail);
+ public void createStaticTopic(String addr, String defaultTopic,
TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+ this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic,
topicConfig, mappingDetail, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 811ee26..06b456f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
mappingDetail) throws MQClientException {
- this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr,
defaultTopic, topicConfig, mappingDetail);
+ public void createStaticTopic(final String addr, final String
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail
mappingDetail, final boolean force) throws MQClientException {
+ this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr,
defaultTopic, topicConfig, mappingDetail, force);
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 5339709..888dad8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
- void createStaticTopic(final String addr, final String defaultTopic, final
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws
RemotingException, MQBrokerException,
+ void createStaticTopic(final String addr, final String defaultTopic, final
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final
boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 3b1b27a..c46c28a 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
- doRemapping(topic, wrapper.getBrokerToMapIn(),
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt);
+ doRemapping(topic, wrapper.getBrokerToMapIn(),
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt, false);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
public void doRemapping(String topic, Set<String> brokersToMapIn,
Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping>
brokerConfigMap,
- ClientMetadata clientMetadata, DefaultMQAdminExt
defaultMQAdminExt) throws Exception {
+ ClientMetadata clientMetadata, DefaultMQAdminExt
defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping =
brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping =
brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping =
brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), false);
}
}
@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
System.out.println("The old mapping data is written to file "
+ newMappingDataFile);
}
- doRemapping(topic, brokersToMapIn, brokersToMapOut,
brokerConfigMap, clientMetadata, defaultMQAdminExt);
+ doRemapping(topic, brokersToMapIn, brokersToMapOut,
brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 6184290..7c6abdc 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), false);
}
}