This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ae0139aa0 [ISSUE #8968] Introduce the clearRetryTopicWhenDeleteTopic
option to enable precise external deletion of topics (#8969)
3ae0139aa0 is described below
commit 3ae0139aa0cd75fe52d583e69f0c974d5ce2639c
Author: rongtong <[email protected]>
AuthorDate: Wed Nov 27 16:19:06 2024 +0800
[ISSUE #8968] Introduce the clearRetryTopicWhenDeleteTopic option to enable
precise external deletion of topics (#8969)
* Add the clearRetryTopicWhenDeleteTopic option to allow precise deletion
of topics externally without the need to traverse consumerOffset
* Fix check style
---
.../apache/rocketmq/broker/BrokerController.java | 7 ++-
.../broker/processor/AdminBrokerProcessor.java | 71 ++++++++++++----------
.../org/apache/rocketmq/common/BrokerConfig.java | 9 +++
3 files changed, 55 insertions(+), 32 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b907489bbf..99e5b85d2e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -191,7 +191,7 @@ public class BrokerController {
private final NettyClientConfig nettyClientConfig;
protected final MessageStoreConfig messageStoreConfig;
private final AuthConfig authConfig;
- protected final ConsumerOffsetManager consumerOffsetManager;
+ protected ConsumerOffsetManager consumerOffsetManager;
protected final BroadcastOffsetManager broadcastOffsetManager;
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
@@ -1313,6 +1313,11 @@ public class BrokerController {
return consumerOffsetManager;
}
+ public void setConsumerOffsetManager(ConsumerOffsetManager
consumerOffsetManager) {
+ this.consumerOffsetManager = consumerOffsetManager;
+ }
+
+
public BroadcastOffsetManager getBroadcastOffsetManager() {
return broadcastOffsetManager;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ac882e94ab..cc70e69a46 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -490,6 +490,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
response.setBody(JSON.toJSONBytes(result));
return response;
}
+
@Override
public boolean rejectRequest() {
return false;
@@ -559,18 +560,17 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
- }
- finally {
+ } finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
- InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+ InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
- .put(LABEL_INVOCATION_STATUS, status.getName())
- .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
- .build();
+ .put(LABEL_INVOCATION_STATUS, status.getName())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
+ .build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime,
attributes);
}
- LOGGER.info("executionTime of create topic:{} is {} ms" , topic,
executionTime);
+ LOGGER.info("executionTime of create topic:{} is {} ms", topic,
executionTime);
return response;
}
@@ -637,8 +637,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
- }
- finally {
+ } finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
@@ -648,7 +647,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime,
attributes);
}
- LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames,
executionTime);
+ LOGGER.info("executionTime of all topics:{} is {} ms", topicNames,
executionTime);
return response;
}
@@ -725,21 +724,28 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
}
- final Set<String> groups =
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
- // delete pop retry topics first
- try {
+ List<String> topicsToClean = new ArrayList<>();
+ topicsToClean.add(topic);
+
+ if
(brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
+ final Set<String> groups =
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
for (String group : groups) {
final String popRetryTopicV2 =
KeyBuilder.buildPopRetryTopic(topic, group, true);
if
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) !=
null) {
- deleteTopicInBroker(popRetryTopicV2);
+ topicsToClean.add(popRetryTopicV2);
}
final String popRetryTopicV1 =
KeyBuilder.buildPopRetryTopicV1(topic, group);
if
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) !=
null) {
- deleteTopicInBroker(popRetryTopicV1);
+ topicsToClean.add(popRetryTopicV1);
}
}
- // delete topic
- deleteTopicInBroker(topic);
+ }
+
+ try {
+ for (String topicToClean : topicsToClean) {
+ // delete topic
+ deleteTopicInBroker(topicToClean);
+ }
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
}
@@ -982,10 +988,10 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
String consumerGroup = String.valueOf(key);
Long threshold =
Long.valueOf(String.valueOf(value));
this.brokerController.getColdDataCgCtrService()
- .addOrUpdateGroupConfig(consumerGroup,
threshold);
+ .addOrUpdateGroupConfig(consumerGroup,
threshold);
} catch (Exception e) {
LOGGER.error("updateColdDataFlowCtrGroupConfig
properties on entry error, key: {}, val: {}",
- key, value, e);
+ key, value, e);
}
});
} else {
@@ -1598,12 +1604,12 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
- LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms"
,config.getGroupName() ,executionTime);
+ LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms",
config.getGroupName(), executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
- InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+ InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
- .put(LABEL_INVOCATION_STATUS, status.getName())
- .build();
+ .put(LABEL_INVOCATION_STATUS, status.getName())
+ .build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime,
attributes);
return response;
}
@@ -2083,13 +2089,13 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
/**
* Reset consumer offset.
*
- * @param topic Required, not null.
- * @param group Required, not null.
- * @param queueId if target queue ID is negative, all message queues
will be reset; otherwise, only the target queue
- * would get reset.
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues will
be reset; otherwise, only the target queue
+ * would get reset.
* @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being; otherwise,
- * binary search is performed to locate target offset.
- * @param offset Target offset to reset to if target queue ID is
properly provided.
+ * binary search is performed to locate target offset.
+ * @param offset Target offset to reset to if target queue ID is properly
provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int
queueId, long timestamp, Long offset) {
@@ -3371,7 +3377,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return false;
}
- private CheckRocksdbCqWriteResult
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand
request) throws RemotingCommandException {
+ private CheckRocksdbCqWriteResult
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader =
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
String requestTopic = requestHeader.getTopic();
MessageStore messageStore = brokerController.getMessageStore();
@@ -3428,7 +3435,9 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return result;
}
- private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long
checkpointByStoreTime) {
+ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic,
+ RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult,
boolean printDetail,
+ long checkpointByStoreTime) {
boolean processResult = true;
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9d8d913521..c0b557dfa1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -435,6 +435,7 @@ public class BrokerConfig extends BrokerIdentity {
private boolean appendCkAsync = false;
+ private boolean clearRetryTopicWhenDeleteTopic = true;
private boolean enableLmqStats = false;
@@ -1908,6 +1909,14 @@ public class BrokerConfig extends BrokerIdentity {
this.appendCkAsync = appendCkAsync;
}
+ public boolean isClearRetryTopicWhenDeleteTopic() {
+ return clearRetryTopicWhenDeleteTopic;
+ }
+
+ public void setClearRetryTopicWhenDeleteTopic(boolean
clearRetryTopicWhenDeleteTopic) {
+ this.clearRetryTopicWhenDeleteTopic = clearRetryTopicWhenDeleteTopic;
+ }
+
public boolean isEnableLmqStats() {
return enableLmqStats;
}