This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 81e3648e3 Introduce retry topic and lag estimation for lag
calculation. (#5702)
81e3648e3 is described below
commit 81e3648e3fcb1995897db3960a4561d6047b696c
Author: SSpirits <[email protected]>
AuthorDate: Fri Dec 16 16:48:56 2022 +0800
Introduce retry topic and lag estimation for lag calculation. (#5702)
---
.../broker/metrics/BrokerMetricsConstant.java | 2 +-
.../broker/metrics/BrokerMetricsManager.java | 25 +--
.../broker/metrics/ConsumerLagCalculator.java | 188 ++++++++++++++++-----
.../processor/AbstractSendMessageProcessor.java | 15 +-
.../broker/processor/PopMessageProcessor.java | 13 ++
.../broker/processor/SendMessageProcessor.java | 10 +-
.../store/metrics/DefaultStoreMetricsConstant.java | 1 -
7 files changed, 186 insertions(+), 68 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index c7b6aa8e1..73b40f6ba 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -36,7 +36,7 @@ public class BrokerMetricsConstant {
public static final String GAUGE_CONSUMER_INFLIGHT_MESSAGES =
"rocketmq_consumer_inflight_messages";
public static final String GAUGE_CONSUMER_QUEUEING_LATENCY =
"rocketmq_consumer_queueing_latency";
public static final String GAUGE_CONSUMER_READY_MESSAGES =
"rocketmq_consumer_ready_messages";
- public static final String GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL =
"rocketmq_send_to_dlq_messages_total";
+ public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL =
"rocketmq_send_to_dlq_messages_total";
public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index a1d4f5917..5fb8085a5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
@@ -76,7 +77,6 @@ import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CON
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
@@ -84,6 +84,7 @@ import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGG
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
@@ -104,7 +105,6 @@ public class BrokerMetricsManager {
private final BrokerController brokerController;
private final ConsumerLagCalculator consumerLagCalculator;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
- private final Map<String, Long> dlqOffsetMap = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
@@ -131,7 +131,7 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerInflightMessages = new
NopObservableLongGauge();
public static ObservableLongGauge consumerQueueingLatency = new
NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new
NopObservableLongGauge();
- public static ObservableLongGauge sendToDlqMessages = new
NopObservableLongGauge();
+ public static LongCounter sendToDlqMessages = new NopLongCounter();
public BrokerMetricsManager(BrokerController brokerController) {
this.brokerController = brokerController;
@@ -151,6 +151,7 @@ public class BrokerMetricsManager {
AttributesBuilder attributesBuilder = newAttributesBuilder();
attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
attributesBuilder.put(LABEL_TOPIC, result.topic);
+ attributesBuilder.put(LABEL_IS_RETRY, result.isRetry);
attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic,
result.group));
return attributesBuilder.build();
}
@@ -487,23 +488,9 @@ public class BrokerMetricsManager {
.buildWithCallback(measurement ->
consumerLagCalculator.calculateAvailable(result ->
measurement.record(result.available, buildLagAttributes(result))));
- sendToDlqMessages =
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL)
+ sendToDlqMessages =
brokerMeter.counterBuilder(COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL)
.setDescription("Consumer send to DLQ messages")
- .ofLongs()
- .buildWithCallback(measurement ->
consumerLagCalculator.calculateSendToDLQ(result -> {
- long val = result.dlqMessageCount;
- if (brokerConfig.isMetricsInDelta()) {
- String key = result.group + "%%" + result.topic;
- Long lastOffset = dlqOffsetMap.computeIfAbsent(key, k ->
result.dlqMessageCount);
- dlqOffsetMap.put(key, result.dlqMessageCount);
- val -= lastOffset;
- }
- AttributesBuilder attributesBuilder = newAttributesBuilder();
- attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
- attributesBuilder.put(LABEL_TOPIC, result.topic);
- attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic,
result.group));
- measurement.record(val, attributesBuilder.build());
- }));
+ .build();
}
private void initOtherMetrics() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index befcf6e53..4b8767de5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -22,22 +22,29 @@ import java.util.function.Consumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.processor.PopBufferMergeService;
+import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
public class ConsumerLagCalculator {
private final BrokerConfig brokerConfig;
@@ -48,6 +55,7 @@ public class ConsumerLagCalculator {
private final SubscriptionGroupManager subscriptionGroupManager;
private final MessageStore messageStore;
private final PopBufferMergeService popBufferMergeService;
+ private final PopInflightMessageCounter popInflightMessageCounter;
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -60,25 +68,33 @@ public class ConsumerLagCalculator {
this.subscriptionGroupManager =
brokerController.getSubscriptionGroupManager();
this.messageStore = brokerController.getMessageStore();
this.popBufferMergeService =
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+ this.popInflightMessageCounter =
brokerController.getPopInflightMessageCounter();
}
private static class ProcessGroupInfo {
public String group;
public String topic;
+ public boolean isPop;
+ public String retryTopic;
- public ProcessGroupInfo(String group, String topic) {
+ public ProcessGroupInfo(String group, String topic, boolean isPop,
+ String retryTopic) {
this.group = group;
this.topic = topic;
+ this.isPop = isPop;
+ this.retryTopic = retryTopic;
}
}
public static class BaseCalculateResult {
public String group;
public String topic;
+ public boolean isRetry;
- public BaseCalculateResult(String group, String topic) {
+ public BaseCalculateResult(String group, String topic, boolean
isRetry) {
this.group = group;
this.topic = topic;
+ this.isRetry = isRetry;
}
}
@@ -86,8 +102,8 @@ public class ConsumerLagCalculator {
public long lag;
public long earliestUnconsumedTimestamp;
- public CalculateLagResult(String group, String topic) {
- super(group, topic);
+ public CalculateLagResult(String group, String topic, boolean isRetry)
{
+ super(group, topic, isRetry);
}
}
@@ -95,16 +111,16 @@ public class ConsumerLagCalculator {
public long inFlight;
public long earliestUnPulledTimestamp;
- public CalculateInflightResult(String group, String topic) {
- super(group, topic);
+ public CalculateInflightResult(String group, String topic, boolean
isRetry) {
+ super(group, topic, isRetry);
}
}
public static class CalculateAvailableResult extends BaseCalculateResult {
public long available;
- public CalculateAvailableResult(String group, String topic) {
- super(group, topic);
+ public CalculateAvailableResult(String group, String topic, boolean
isRetry) {
+ super(group, topic, isRetry);
}
}
@@ -112,7 +128,7 @@ public class ConsumerLagCalculator {
public long dlqMessageCount;
public CalculateSendToDLQResult(String group, String topic) {
- super(group, topic);
+ super(group, topic, false);
}
}
@@ -122,10 +138,11 @@ public class ConsumerLagCalculator {
String group = subscriptionEntry.getKey();
SubscriptionGroupConfig subscriptionGroupConfig =
subscriptionEntry.getValue();
- ConsumerGroupInfo consumerGroupInfo =
consumerManager.getConsumerGroupInfo(group);
+ ConsumerGroupInfo consumerGroupInfo =
consumerManager.getConsumerGroupInfo(group, true);
if (consumerGroupInfo == null) {
continue;
}
+ boolean isPop = consumerGroupInfo.getConsumeType() ==
ConsumeType.CONSUME_POP;
Set<String> topics = consumerGroupInfo.getSubscribeTopics();
if (null == topics || topics.isEmpty()) {
continue;
@@ -148,73 +165,94 @@ public class ConsumerLagCalculator {
continue;
}
- consumer.accept(new ProcessGroupInfo(group, topic));
+ if (isPop) {
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
group);
+ TopicConfig retryTopicConfig =
topicConfigManager.selectTopicConfig(retryTopic);
+ int retryTopicPerm = topicConfig.getPerm() &
brokerConfig.getBrokerPermission();
+ if (PermName.isReadable(retryTopicPerm) ||
PermName.isWriteable(retryTopicPerm)) {
+ consumer.accept(new ProcessGroupInfo(group, topic,
true, retryTopic));
+ } else {
+ consumer.accept(new ProcessGroupInfo(group, topic,
true, null));
+ }
+ } else {
+ consumer.accept(new ProcessGroupInfo(group, topic, false,
null));
+ }
}
}
}
public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
processAllGroup(info -> {
- CalculateLagResult result = new CalculateLagResult(info.group,
info.topic);
+ CalculateLagResult result = new CalculateLagResult(info.group,
info.topic, false);
- Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic);
+ Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic,
info.isPop);
if (lag != null) {
result.lag = lag.getObject1();
result.earliestUnconsumedTimestamp = lag.getObject2();
}
lagRecorder.accept(result);
+
+ if (info.isPop) {
+ Pair<Long, Long> retryLag = getConsumerLagStats(info.group,
info.retryTopic, true);
+
+ result = new CalculateLagResult(info.group, info.topic, true);
+ if (retryLag != null) {
+ result.lag = retryLag.getObject1();
+ result.earliestUnconsumedTimestamp = retryLag.getObject2();
+ }
+ lagRecorder.accept(result);
+ }
});
}
public void calculateInflight(Consumer<CalculateInflightResult>
inflightRecorder) {
processAllGroup(info -> {
- CalculateInflightResult result = new
CalculateInflightResult(info.group, info.topic);
- Pair<Long, Long> inFlight = getInFlightMsgStats(info.group,
info.topic);
+ CalculateInflightResult result = new
CalculateInflightResult(info.group, info.topic, false);
+ Pair<Long, Long> inFlight = getInFlightMsgStats(info.group,
info.topic, info.isPop);
if (inFlight != null) {
result.inFlight = inFlight.getObject1();
result.earliestUnPulledTimestamp = inFlight.getObject2();
}
inflightRecorder.accept(result);
+
+ if (info.isPop) {
+ Pair<Long, Long> retryInFlight =
getInFlightMsgStats(info.group, info.retryTopic, true);
+
+ result = new CalculateInflightResult(info.group, info.topic,
true);
+ if (retryInFlight != null) {
+ result.inFlight = retryInFlight.getObject1();
+ result.earliestUnPulledTimestamp =
retryInFlight.getObject2();
+ }
+ inflightRecorder.accept(result);
+ }
});
}
public void calculateAvailable(Consumer<CalculateAvailableResult>
availableRecorder) {
processAllGroup(info -> {
- CalculateAvailableResult result = new
CalculateAvailableResult(info.group, info.topic);
+ CalculateAvailableResult result = new
CalculateAvailableResult(info.group, info.topic, false);
- result.available = getAvailableMsgCount(info.group, info.topic);
+ result.available = getAvailableMsgCount(info.group, info.topic,
info.isPop);
availableRecorder.accept(result);
- });
- }
-
- public void calculateSendToDLQ(Consumer<CalculateSendToDLQResult>
dlqRecorder) {
- processAllGroup(info -> {
- CalculateSendToDLQResult result = new
CalculateSendToDLQResult(info.group, info.topic);
- String dlqTopic = MixAll.DLQ_GROUP_TOPIC_PREFIX + info.group;
- TopicConfig topicConfig =
topicConfigManager.selectTopicConfig(dlqTopic);
- if (topicConfig == null) {
- return;
- }
+ if (info.isPop) {
+ long retryAvailable = getAvailableMsgCount(info.group,
info.retryTopic, true);
- ConsumeQueueInterface consumeQueue =
messageStore.getConsumeQueue(dlqTopic, 0);
- if (consumeQueue == null) {
- return;
+ result = new CalculateAvailableResult(info.group, info.topic,
true);
+ result.available = retryAvailable;
+ availableRecorder.accept(result);
}
-
- result.dlqMessageCount = consumeQueue.getMaxOffsetInQueue();
- dlqRecorder.accept(result);
});
}
- public Pair<Long, Long> getConsumerLagStats(String group, String topic) {
+ public Pair<Long, Long> getConsumerLagStats(String group, String topic,
boolean isPop) {
long total = 0L;
long earliestUnconsumedTimestamp = Long.MAX_VALUE;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
if (topicConfig != null) {
for (int queueId = 0; queueId < topicConfig.getWriteQueueNums();
queueId++) {
- Pair<Long, Long> pair = getConsumerLagStats(group, topic,
queueId);
+ Pair<Long, Long> pair = getConsumerLagStats(group, topic,
queueId, isPop);
total += pair.getObject1();
earliestUnconsumedTimestamp =
Math.min(earliestUnconsumedTimestamp, pair.getObject2());
}
@@ -229,12 +267,27 @@ public class ConsumerLagCalculator {
return new Pair<>(total, earliestUnconsumedTimestamp);
}
- public Pair<Long, Long> getConsumerLagStats(String group, String topic,
int queueId) {
+ public Pair<Long, Long> getConsumerLagStats(String group, String topic,
int queueId, boolean isPop) {
long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
if (brokerOffset < 0) {
brokerOffset = 0;
}
+ if (isPop) {
+ long pullOffset = popBufferMergeService.getLatestOffset(topic,
group, queueId);
+ if (pullOffset < 0) {
+ pullOffset = offsetManager.queryOffset(group, topic, queueId);
+ }
+ if (pullOffset < 0) {
+ pullOffset = brokerOffset;
+ }
+ long inFlightNum =
popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
+ long lag = calculateMessageCount(group, topic, queueId,
pullOffset, brokerOffset) + inFlightNum;
+ long consumerOffset = pullOffset - inFlightNum;
+ long consumerStoreTimeStamp = getStoreTimeStamp(topic, queueId,
consumerOffset);
+ return new Pair<>(lag, consumerStoreTimeStamp);
+ }
+
long consumerOffset = offsetManager.queryOffset(group, topic, queueId);
if (consumerOffset < 0) {
consumerOffset = brokerOffset;
@@ -245,14 +298,14 @@ public class ConsumerLagCalculator {
return new Pair<>(lag, consumerStoreTimeStamp);
}
- public Pair<Long, Long> getInFlightMsgStats(String group, String topic) {
+ public Pair<Long, Long> getInFlightMsgStats(String group, String topic,
boolean isPop) {
long total = 0L;
long earliestUnPulledTimestamp = Long.MAX_VALUE;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
if (topicConfig != null) {
for (int queueId = 0; queueId < topicConfig.getWriteQueueNums();
queueId++) {
- Pair<Long, Long> pair = getInFlightMsgStats(group, topic,
queueId);
+ Pair<Long, Long> pair = getInFlightMsgStats(group, topic,
queueId, isPop);
total += pair.getObject1();
earliestUnPulledTimestamp =
Math.min(earliestUnPulledTimestamp, pair.getObject2());
}
@@ -267,7 +320,20 @@ public class ConsumerLagCalculator {
return new Pair<>(total, earliestUnPulledTimestamp);
}
- public Pair<Long, Long> getInFlightMsgStats(String group, String topic,
int queueId) {
+ public Pair<Long, Long> getInFlightMsgStats(String group, String topic,
int queueId, boolean isPop) {
+ if (isPop) {
+ long inflight =
popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
+ long pullOffset = popBufferMergeService.getLatestOffset(topic,
group, queueId);
+ if (pullOffset < 0) {
+ pullOffset = offsetManager.queryOffset(group, topic, queueId);
+ }
+ if (pullOffset < 0) {
+ pullOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
+ }
+ long pullStoreTimeStamp = getStoreTimeStamp(topic, queueId,
pullOffset);
+ return new Pair<>(inflight, pullStoreTimeStamp);
+ }
+
long pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
if (pullOffset < 0) {
pullOffset = 0;
@@ -283,13 +349,13 @@ public class ConsumerLagCalculator {
return new Pair<>(inflight, pullStoreTimeStamp);
}
- public long getAvailableMsgCount(String group, String topic) {
+ public long getAvailableMsgCount(String group, String topic, boolean
isPop) {
long total = 0L;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
if (topicConfig != null) {
for (int queueId = 0; queueId < topicConfig.getWriteQueueNums();
queueId++) {
- total += getAvailableMsgCount(group, topic, queueId);
+ total += getAvailableMsgCount(group, topic, queueId, isPop);
}
} else {
LOGGER.warn("failed to get config of topic {}", topic);
@@ -298,13 +364,24 @@ public class ConsumerLagCalculator {
return total;
}
- public long getAvailableMsgCount(String group, String topic, int queueId) {
+ public long getAvailableMsgCount(String group, String topic, int queueId,
boolean isPop) {
long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
if (brokerOffset < 0) {
brokerOffset = 0;
}
- long pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
+ long pullOffset;
+ if (isPop) {
+ pullOffset = popBufferMergeService.getLatestOffset(topic, group,
queueId);
+ if (pullOffset < 0) {
+ pullOffset = offsetManager.queryOffset(group, topic, queueId);
+ }
+ if (pullOffset < 0) {
+ pullOffset = brokerOffset;
+ }
+ } else {
+ pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
+ }
if (pullOffset < 0) {
pullOffset = brokerOffset;
}
@@ -323,6 +400,27 @@ public class ConsumerLagCalculator {
public long calculateMessageCount(String group, String topic, int queueId,
long from, long to) {
long count = to - from;
+
+ if (brokerConfig.isEstimateAccumulation() && to > from) {
+ SubscriptionData subscriptionData = null;
+ ConsumerGroupInfo consumerGroupInfo =
consumerManager.getConsumerGroupInfo(group, true);
+ if (consumerGroupInfo != null) {
+ subscriptionData =
consumerGroupInfo.findSubscriptionData(topic);
+ }
+ if (null != subscriptionData &&
+
ExpressionType.TAG.equalsIgnoreCase(subscriptionData.getExpressionType()) &&
+
!SubscriptionData.SUB_ALL.equals(subscriptionData.getSubString())) {
+ count = messageStore.estimateMessageCount(topic, queueId,
from, to,
+ new DefaultMessageFilter(subscriptionData));
+ } else if (null != subscriptionData &&
+
ExpressionType.SQL92.equalsIgnoreCase(subscriptionData.getExpressionType())) {
+ ConsumerFilterData consumerFilterData =
consumerFilterManager.get(topic, group);
+ count = messageStore.estimateMessageCount(topic, queueId,
from, to,
+ new ExpressionMessageFilter(subscriptionData,
+ consumerFilterData,
+ consumerFilterManager));
+ }
+ }
return count < 0 ? 0 : count;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 9022f66ec..933a8984b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,17 +17,19 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbortProcessException;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -63,6 +65,10 @@ import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public abstract class AbstractSendMessageProcessor implements
NettyRequestProcessor {
protected static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected static final Logger DLQ_LOG =
LoggerFactory.getLogger(LoggerName.DLQ_LOGGER_NAME);
@@ -184,6 +190,13 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
+ .put(LABEL_TOPIC, requestHeader.getOriginTopic())
+ .put(LABEL_IS_SYSTEM,
BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(),
requestHeader.getGroup()))
+ .build();
+ BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
isDLQ = true;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5bb81df5a..26adbb094 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -369,6 +369,19 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
response.setRemark("parse the consumer's subscription failed");
return response;
}
+ } else {
+ try {
+ SubscriptionData subscriptionData =
FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
+
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), subscriptionData);
+
+ String retryTopic =
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
+ SubscriptionData retrySubscriptionData =
FilterAPI.build(retryTopic, "*", ExpressionType.TAG);
+
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+ retryTopic, retrySubscriptionData);
+ } catch (Exception e) {
+ POP_LOGGER.warn("Build default subscription error, group: {}",
requestHeader.getConsumerGroup());
+ }
}
int randomQ = random.nextInt(100);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6a2af0ddb..45517e1bb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -26,8 +26,8 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -67,6 +67,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
@@ -185,6 +186,13 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0
: requestHeader.getReconsumeTimes();
// Using '>' instead of '>=' to compatible with the case that
reconsumeTimes here are increased by client.
if (reconsumeTimes > maxReconsumeTimes) {
+ Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_CONSUMER_GROUP,
requestHeader.getProducerGroup())
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_IS_SYSTEM,
BrokerMetricsManager.isSystem(requestHeader.getTopic(),
requestHeader.getProducerGroup()))
+ .build();
+ BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index bcaf5b01c..b5993222c 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -26,5 +26,4 @@ public class DefaultStoreMetricsConstant {
public static final String DEFAULT_STORAGE_TYPE = "local";
public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
public static final String DEFAULT_STORAGE_MEDIUM = "disk";
- public static final String LABEL_TOPIC = "topic";
}