This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3d357bb3ed [ISSUE #7772] Ensuring broker protection capabilities when
POP does not return ACK (#7773)
3d357bb3ed is described below
commit 3d357bb3ed4dcee38cdde9b1525b15226820321a
Author: Ji Juntao <[email protected]>
AuthorDate: Fri Jan 26 14:34:53 2024 +0800
[ISSUE #7772] Ensuring broker protection capabilities when POP does not
return ACK (#7773)
* add stop pop situation.
* refactor the imports
* modify the threadhold into threshold.
* checkstyle
---
.../broker/processor/PopMessageProcessor.java | 12 ++++++++++++
.../java/org/apache/rocketmq/common/BrokerConfig.java | 19 +++++++++++++++++++
2 files changed, 31 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 02e266a786..105e11643f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -519,6 +519,13 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return future;
}
+ if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId))
{
+ POP_LOGGER.warn("Too much msgs unacked, then stop poping.
topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(),
queueId);
+ restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
+ future.complete(restNum);
+ return future;
+ }
+
try {
future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
@@ -667,6 +674,11 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
});
}
+ private boolean isPopShouldStop(String topic, String group, int queueId) {
+ return
brokerController.getBrokerConfig().isEnablePopMessageThreshold() &&
+
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
group, queueId) >
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
+ }
+
private long getPopOffset(String topic, String group, int queueId, int
initMode, boolean init, String lockKey,
boolean checkResetOffset) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 0a2c528f86..0a1bfa5d67 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -407,6 +407,9 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean enableSplitRegistration = false;
+ private long popInflightMessageThreshold = 10000;
+ private boolean enablePopMessageThreshold = false;
+
private int splitRegistrationSize = 800;
/**
@@ -1799,4 +1802,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setTransactionMetricFlushInterval(long
transactionMetricFlushInterval) {
this.transactionMetricFlushInterval = transactionMetricFlushInterval;
}
+
+ public long getPopInflightMessageThreshold() {
+ return popInflightMessageThreshold;
+ }
+
+ public void setPopInflightMessageThreshold(long
popInflightMessageThreshold) {
+ this.popInflightMessageThreshold = popInflightMessageThreshold;
+ }
+
+ public boolean isEnablePopMessageThreshold() {
+ return enablePopMessageThreshold;
+ }
+
+ public void setEnablePopMessageThreshold(boolean
enablePopMessageThreshold) {
+ this.enablePopMessageThreshold = enablePopMessageThreshold;
+ }
}