This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 19393e0475 [ISSUE #8974] Add feature switch of recalling, disable by
default (#9067)
19393e0475 is described below
commit 19393e047515db3d65e898bb254c1f16d62ffcd3
Author: imzs <[email protected]>
AuthorDate: Fri Dec 27 13:56:09 2024 +0800
[ISSUE #8974] Add feature switch of recalling, disable by default (#9067)
---
.../rocketmq/broker/processor/RecallMessageProcessor.java | 6 ++++++
.../rocketmq/broker/processor/RecallMessageProcessorTest.java | 9 +++++++++
.../src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
.../org/apache/rocketmq/test/base/IntegrationTestBase.java | 1 +
4 files changed, 26 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
index 7a652f4315..372db0d36e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
@@ -57,6 +57,12 @@ public class RecallMessageProcessor implements
NettyRequestProcessor {
final RecallMessageRequestHeader requestHeader =
request.decodeCommandCustomHeader(RecallMessageRequestHeader.class);
+ if (!brokerController.getBrokerConfig().isRecallMessageEnable()) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("recall failed, operation is forbidden");
+ return response;
+ }
+
if (BrokerRole.SLAVE ==
brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
response.setRemark("recall failed, broker service not available");
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
index 7bd260cc2c..d28eb2f1df 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
@@ -89,6 +89,7 @@ public class RecallMessageProcessorTest {
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME);
+ when(brokerConfig.isRecallMessageEnable()).thenReturn(true);
when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager);
when(handlerContext.channel()).thenReturn(channel);
recallMessageProcessor = new RecallMessageProcessor(brokerController);
@@ -134,6 +135,14 @@ public class RecallMessageProcessorTest {
}
}
+ @Test
+ public void testProcessRequest_notEnable() throws RemotingCommandException
{
+ when(brokerConfig.isRecallMessageEnable()).thenReturn(false);
+ RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id",
BROKER_NAME);
+ RemotingCommand response =
recallMessageProcessor.processRequest(handlerContext, request);
+ Assert.assertEquals(ResponseCode.NO_PERMISSION, response.getCode());
+ }
+
@Test
public void testProcessRequest_invalidStatus() throws
RemotingCommandException {
RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id",
BROKER_NAME);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b5dc1899e9..dd34544935 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -453,6 +453,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean allowRecallWhenBrokerNotWriteable = true;
+ private boolean recallMessageEnable = false;
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -1996,4 +1998,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setAllowRecallWhenBrokerNotWriteable(boolean
allowRecallWhenBrokerNotWriteable) {
this.allowRecallWhenBrokerNotWriteable =
allowRecallWhenBrokerNotWriteable;
}
+
+ public boolean isRecallMessageEnable() {
+ return recallMessageEnable;
+ }
+
+ public void setRecallMessageEnable(boolean recallMessageEnable) {
+ this.recallMessageEnable = recallMessageEnable;
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index fde991ad13..287e54d561 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -138,6 +138,7 @@ public class IntegrationTestBase {
brokerConfig.setEnableCalcFilterBitMap(true);
brokerConfig.setAppendAckAsync(true);
brokerConfig.setAppendCkAsync(true);
+ brokerConfig.setRecallMessageEnable(true);
storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);