This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 23cc24cc6e [ISSUE #8947] Notify pop request before calculate consumer
lag (#8949)
23cc24cc6e is described below
commit 23cc24cc6e2fa33b9be2c434c42dee3a54e726a4
Author: lizhimins <[email protected]>
AuthorDate: Wed Nov 20 14:57:35 2024 +0800
[ISSUE #8947] Notify pop request before calculate consumer lag (#8949)
---
.../longpolling/NotifyMessageArrivingListener.java | 4 +-
.../broker/longpolling/PopCommandCallback.java | 49 +++++++++++++++++
.../broker/longpolling/PopLongPollingService.java | 49 +++++++++++++----
.../broker/metrics/ConsumerLagCalculator.java | 62 ++++++++++++++--------
.../broker/processor/NotificationProcessor.java | 4 +-
.../broker/processor/PopMessageProcessor.java | 18 +++++--
.../longpolling/PopLongPollingServiceTest.java | 11 ++--
.../org/apache/rocketmq/common/BrokerConfig.java | 19 +++++++
.../apache/rocketmq/remoting/CommandCallback.java | 22 ++++++++
.../remoting/protocol/RemotingCommand.java | 11 ++++
10 files changed, 206 insertions(+), 43 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 1ddb9f4f8e..9c0ee89e4d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -40,8 +40,8 @@ public class NotifyMessageArrivingListener implements
MessageArrivingListener {
this.pullRequestHoldService.notifyMessageArriving(
topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties);
this.popMessageProcessor.notifyMessageArriving(
- topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties);
this.notificationProcessor.notifyMessageArriving(
- topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
new file mode 100644
index 0000000000..2e190e20f9
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.longpolling;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
+import org.apache.rocketmq.remoting.CommandCallback;
+
+public class PopCommandCallback implements CommandCallback {
+
+ private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+ Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
+
+ private final ConsumerLagCalculator.ProcessGroupInfo info;
+ private final Consumer<ConsumerLagCalculator.CalculateLagResult>
lagRecorder;
+
+
+ public PopCommandCallback(
+ BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+ Consumer<ConsumerLagCalculator.CalculateLagResult>>
biConsumer,
+ ConsumerLagCalculator.ProcessGroupInfo info,
+ Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) {
+
+ this.biConsumer = biConsumer;
+ this.info = info;
+ this.lagRecorder = lagRecorder;
+ }
+
+ @Override
+ public void accept() {
+ biConsumer.accept(info, lagRecorder);
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index b5179114f3..91185fbe94 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -45,6 +47,7 @@ import static
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC;
import static
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT;
public class PopLongPollingService extends ServiceThread {
+
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
@@ -150,10 +153,10 @@ public class PopLongPollingService extends ServiceThread {
}
public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId) {
- this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L,
null, null);
+ this.notifyMessageArrivingWithRetryTopic(topic, queueId, -1L, null,
0L, null, null);
}
- public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId,
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
@@ -161,25 +164,37 @@ public class PopLongPollingService extends ServiceThread {
} else {
notifyTopic = topic;
}
- notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime,
filterBitMap, properties);
+ notifyMessageArriving(notifyTopic, queueId, offset, tagsCode,
msgStoreTime, filterBitMap, properties);
}
- public void notifyMessageArriving(final String topic, final int queueId,
+ public void notifyMessageArriving(final String topic, final int queueId,
long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
return;
}
+ long interval =
brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
+ boolean force = interval > 0L && offset % interval == 0L;
for (Map.Entry<String, Byte> cid : cids.entrySet()) {
if (queueId >= 0) {
- notifyMessageArriving(topic, -1, cid.getKey(), tagsCode,
msgStoreTime, filterBitMap, properties);
+ notifyMessageArriving(topic, -1, cid.getKey(), force,
tagsCode, msgStoreTime, filterBitMap, properties);
}
- notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode,
msgStoreTime, filterBitMap, properties);
+ notifyMessageArriving(topic, queueId, cid.getKey(), force,
tagsCode, msgStoreTime, filterBitMap, properties);
}
}
public boolean notifyMessageArriving(final String topic, final int
queueId, final String cid,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
+ return notifyMessageArriving(topic, queueId, cid, false, tagsCode,
msgStoreTime, filterBitMap, properties, null);
+ }
+
+ public boolean notifyMessageArriving(final String topic, final int
queueId, final String cid, boolean force,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
+ return notifyMessageArriving(topic, queueId, cid, force, tagsCode,
msgStoreTime, filterBitMap, properties, null);
+ }
+
+ public boolean notifyMessageArriving(final String topic, final int
queueId, final String cid, boolean force,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties, CommandCallback callback) {
ConcurrentSkipListSet<PopRequest> remotingCommands =
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
@@ -190,7 +205,7 @@ public class PopLongPollingService extends ServiceThread {
return false;
}
- if (popRequest.getMessageFilter() != null &&
popRequest.getSubscriptionData() != null) {
+ if (!force && popRequest.getMessageFilter() != null &&
popRequest.getSubscriptionData() != null) {
boolean match =
popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime,
filterBitMap));
if (match && properties != null) {
@@ -206,16 +221,30 @@ public class PopLongPollingService extends ServiceThread {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}",
popRequest);
}
- return wakeUp(popRequest);
+
+ return wakeUp(popRequest, callback);
}
public boolean wakeUp(final PopRequest request) {
+ return wakeUp(request, null);
+ }
+
+ public boolean wakeUp(final PopRequest request, CommandCallback callback) {
if (request == null || !request.complete()) {
return false;
}
+
+ if (callback != null && request.getRemotingCommand() != null) {
+ if (request.getRemotingCommand().getCallbackList() == null) {
+ request.getRemotingCommand().setCallbackList(new
ArrayList<>());
+ }
+ request.getRemotingCommand().getCallbackList().add(callback);
+ }
+
if (!request.getCtx().channel().isActive()) {
return false;
}
+
Runnable run = () -> {
try {
final RemotingCommand response =
processor.processRequest(request.getCtx(), request.getRemotingCommand());
@@ -234,7 +263,9 @@ public class PopLongPollingService extends ServiceThread {
POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
}
};
- this.brokerController.getPullMessageExecutor().submit(new
RequestTask(run, request.getChannel(), request.getRemotingCommand()));
+
+ this.brokerController.getPullMessageExecutor().submit(
+ new RequestTask(run, request.getChannel(),
request.getRemotingCommand()));
return true;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 3ac6528b2a..1b898f95de 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PopCommandCallback;
+import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.processor.PopBufferMergeService;
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
@@ -51,6 +53,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
public class ConsumerLagCalculator {
+
private final BrokerConfig brokerConfig;
private final TopicConfigManager topicConfigManager;
private final ConsumerManager consumerManager;
@@ -59,6 +62,7 @@ public class ConsumerLagCalculator {
private final SubscriptionGroupManager subscriptionGroupManager;
private final MessageStore messageStore;
private final PopBufferMergeService popBufferMergeService;
+ private final PopLongPollingService popLongPollingService;
private final PopInflightMessageCounter popInflightMessageCounter;
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -72,10 +76,11 @@ public class ConsumerLagCalculator {
this.subscriptionGroupManager =
brokerController.getSubscriptionGroupManager();
this.messageStore = brokerController.getMessageStore();
this.popBufferMergeService =
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+ this.popLongPollingService =
brokerController.getPopMessageProcessor().getPopLongPollingService();
this.popInflightMessageCounter =
brokerController.getPopInflightMessageCounter();
}
- private static class ProcessGroupInfo {
+ public static class ProcessGroupInfo {
public String group;
public String topic;
public boolean isPop;
@@ -211,34 +216,44 @@ public class ConsumerLagCalculator {
return;
}
- CalculateLagResult result = new CalculateLagResult(info.group,
info.topic, false);
+ if (info.isPop &&
brokerConfig.isEnableNotifyBeforePopCalculateLag()) {
+ if (popLongPollingService.notifyMessageArriving(info.topic,
-1, info.group,
+ true, null, 0, null, null, new
PopCommandCallback(this::calculate, info, lagRecorder))) {
+ return;
+ }
+ }
+
+ calculate(info, lagRecorder);
+ });
+ }
+ public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult>
lagRecorder) {
+ CalculateLagResult result = new CalculateLagResult(info.group,
info.topic, false);
+ try {
+ Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic,
info.isPop);
+ if (lag != null) {
+ result.lag = lag.getObject1();
+ result.earliestUnconsumedTimestamp = lag.getObject2();
+ }
+ lagRecorder.accept(result);
+ } catch (ConsumeQueueException e) {
+ LOGGER.error("Failed to get lag stats", e);
+ }
+
+ if (info.isPop) {
try {
- Pair<Long, Long> lag = getConsumerLagStats(info.group,
info.topic, info.isPop);
- if (lag != null) {
- result.lag = lag.getObject1();
- result.earliestUnconsumedTimestamp = lag.getObject2();
+ Pair<Long, Long> retryLag = getConsumerLagStats(info.group,
info.retryTopic, true);
+
+ result = new CalculateLagResult(info.group, info.topic, true);
+ if (retryLag != null) {
+ result.lag = retryLag.getObject1();
+ result.earliestUnconsumedTimestamp = retryLag.getObject2();
}
lagRecorder.accept(result);
} catch (ConsumeQueueException e) {
LOGGER.error("Failed to get lag stats", e);
}
-
- if (info.isPop) {
- try {
- Pair<Long, Long> retryLag =
getConsumerLagStats(info.group, info.retryTopic, true);
-
- result = new CalculateLagResult(info.group, info.topic,
true);
- if (retryLag != null) {
- result.lag = retryLag.getObject1();
- result.earliestUnconsumedTimestamp =
retryLag.getObject2();
- }
- lagRecorder.accept(result);
- } catch (ConsumeQueueException e) {
- LOGGER.error("Failed to get lag stats", e);
- }
- }
- });
+ }
}
public void calculateInflight(Consumer<CalculateInflightResult>
inflightRecorder) {
@@ -320,6 +335,9 @@ public class ConsumerLagCalculator {
earliestUnconsumedTimestamp = 0L;
}
+ LOGGER.debug("GetConsumerLagStats, topic={}, group={}, lag={},
latency={}", topic, group, total,
+ earliestUnconsumedTimestamp > 0 ? System.currentTimeMillis() -
earliestUnconsumedTimestamp : 0);
+
return new Pair<>(total, earliestUnconsumedTimestamp);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 75c77b6d79..b4ebd9c4a9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -62,10 +62,10 @@ public class NotificationProcessor implements
NettyRequestProcessor {
// When a new message is written to CommitLog, this method would be called.
// Suspended long polling will receive notification and be wakeup.
- public void notifyMessageArriving(final String topic, final int queueId,
+ public void notifyMessageArriving(final String topic, final int queueId,
long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
this.popLongPollingService.notifyMessageArrivingWithRetryTopic(
- topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap,
properties);
}
public void notifyMessageArriving(final String topic, final int queueId) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index fe8ccb03dc..e0454afa3c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -65,6 +65,7 @@ import
org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
@@ -97,8 +98,10 @@ import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
public class PopMessageProcessor implements NettyRequestProcessor {
+
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+
private final BrokerController brokerController;
private final Random random = new Random(System.currentTimeMillis());
String reviveTopic;
@@ -196,15 +199,15 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
}
- public void notifyMessageArriving(final String topic, final int queueId,
+ public void notifyMessageArriving(final String topic, final int queueId,
long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
popLongPollingService.notifyMessageArrivingWithRetryTopic(
- topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap,
properties);
}
public void notifyMessageArriving(final String topic, final int queueId,
final String cid) {
popLongPollingService.notifyMessageArriving(
- topic, queueId, cid, null, 0L, null, null);
+ topic, queueId, cid, false, null, 0L, null, null);
}
@Override
@@ -419,6 +422,15 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
final RemotingCommand finalResponse = response;
SubscriptionData finalSubscriptionData = subscriptionData;
getMessageFuture.thenApply(restNum -> {
+ try {
+ if (request.getCallbackList() != null) {
+ request.getCallbackList().forEach(CommandCallback::accept);
+ request.getCallbackList().clear();
+ }
+ } catch (Throwable t) {
+ POP_LOGGER.error("PopProcessor execute callback error", t);
+ }
+
if (!getMessageResult.getMessageBufferList().isEmpty()) {
finalResponse.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 6527beeb68..1f064ec05d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -80,21 +80,22 @@ public class PopLongPollingServiceTest {
@Test
public void testNotifyMessageArrivingWithRetryTopic() {
int queueId = 0;
-
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId, null, 0L, null, null);
+
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId, -1L, null, 0L, null, null);
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId);
- verify(popLongPollingService,
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, null, 0L,
null, null);
+ verify(popLongPollingService,
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null,
0L, null, null);
}
@Test
public void testNotifyMessageArriving() {
int queueId = 0;
Long tagsCode = 123L;
+ long offset = 123L;
long msgStoreTime = System.currentTimeMillis();
byte[] filterBitMap = new byte[]{0x01};
Map<String, String> properties = new ConcurrentHashMap<>();
-
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
-
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
- verify(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
+
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
+ verify(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}
@Test
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c651047661..f459abf0db 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -227,6 +227,9 @@ public class BrokerConfig extends BrokerIdentity {
private int popCkMaxBufferSize = 200000;
private int popCkOffsetMaxQueueSize = 20000;
private boolean enablePopBatchAck = false;
+ // set the interval to the maxFilterMessageSize in MessageStoreConfig
divided by the cq unit size
+ private long popLongPollingForceNotifyInterval = 800;
+ private boolean enableNotifyBeforePopCalculateLag = true;
private boolean enableNotifyAfterPopOrderLockRelease = true;
private boolean initPopOffsetByCheckMsgInMem = true;
// read message from pop retry topic v1, for the compatibility, will be
removed in the future version
@@ -1326,6 +1329,22 @@ public class BrokerConfig extends BrokerIdentity {
this.enableNetWorkFlowControl = enableNetWorkFlowControl;
}
+ public long getPopLongPollingForceNotifyInterval() {
+ return popLongPollingForceNotifyInterval;
+ }
+
+ public void setPopLongPollingForceNotifyInterval(long
popLongPollingForceNotifyInterval) {
+ this.popLongPollingForceNotifyInterval =
popLongPollingForceNotifyInterval;
+ }
+
+ public boolean isEnableNotifyBeforePopCalculateLag() {
+ return enableNotifyBeforePopCalculateLag;
+ }
+
+ public void setEnableNotifyBeforePopCalculateLag(boolean
enableNotifyBeforePopCalculateLag) {
+ this.enableNotifyBeforePopCalculateLag =
enableNotifyBeforePopCalculateLag;
+ }
+
public boolean isEnableNotifyAfterPopOrderLockRelease() {
return enableNotifyAfterPopOrderLockRelease;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java
new file mode 100644
index 0000000000..884f3d9e5d
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public interface CommandCallback {
+
+ void accept();
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 5de48350cf..9b2b0f07b4 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -96,6 +98,7 @@ public class RemotingCommand {
private transient byte[] body;
private boolean suspended;
private transient Stopwatch processTimer;
+ private transient List<CommandCallback> callbackList;
protected RemotingCommand() {
}
@@ -639,4 +642,12 @@ public class RemotingCommand {
public void setProcessTimer(Stopwatch processTimer) {
this.processTimer = processTimer;
}
+
+ public List<CommandCallback> getCallbackList() {
+ return callbackList;
+ }
+
+ public void setCallbackList(List<CommandCallback> callbackList) {
+ this.callbackList = callbackList;
+ }
}