This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e984023d31 [ISSUE #9816] Fix calculate consumer lag with opentelemetry
(#9873)
e984023d31 is described below
commit e984023d31ee31d62d2d17d8e7e1e2bba58071ae
Author: lizhimins <[email protected]>
AuthorDate: Tue Nov 25 19:52:21 2025 +0800
[ISSUE #9816] Fix calculate consumer lag with opentelemetry (#9873)
---
.../broker/longpolling/PopCommandCallback.java | 16 +++++-----
.../broker/metrics/BrokerMetricsManager.java | 10 ++-----
.../broker/metrics/ConsumerLagCalculator.java | 34 ++++++++++++++++++++--
3 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
index 2e190e20f9..ef541a0678 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
@@ -17,33 +17,31 @@
package org.apache.rocketmq.broker.longpolling;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
import org.apache.rocketmq.remoting.CommandCallback;
public class PopCommandCallback implements CommandCallback {
private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
- Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
-
+ CompletableFuture<ConsumerLagCalculator.CalculateLagResult>>
biConsumer;
private final ConsumerLagCalculator.ProcessGroupInfo info;
- private final Consumer<ConsumerLagCalculator.CalculateLagResult>
lagRecorder;
-
+ private final CompletableFuture<ConsumerLagCalculator.CalculateLagResult>
future;
public PopCommandCallback(
BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
- Consumer<ConsumerLagCalculator.CalculateLagResult>>
biConsumer,
+ CompletableFuture<ConsumerLagCalculator.CalculateLagResult>>
biConsumer,
ConsumerLagCalculator.ProcessGroupInfo info,
- Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) {
+ CompletableFuture<ConsumerLagCalculator.CalculateLagResult> future) {
this.biConsumer = biConsumer;
this.info = info;
- this.lagRecorder = lagRecorder;
+ this.future = future;
}
@Override
public void accept() {
- biConsumer.accept(info, lagRecorder);
+ biConsumer.accept(info, future);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 960c1dd250..fe6c180e45 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -673,14 +673,8 @@ public class BrokerMetricsManager {
consumerLagMessages =
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
.setDescription("Consumer lag messages")
.ofLongs()
- .buildWithCallback(measurement ->
- consumerLagCalculator.calculateLag(result -> {
- // Note: 'record' method uses HashMap which may cause
- // concurrent access issues when Pull thread executes Pop
callbacks.
- synchronized (this) {
- measurement.record(result.lag,
buildLagAttributes(result));
- }
- }));
+ .buildWithCallback(measurement ->
consumerLagCalculator.calculateLag(result ->
+ measurement.record(result.lag, buildLagAttributes(result))));
consumerLagLatency =
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_LATENCY)
.setDescription("Consumer lag time")
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 35519c1d1c..d42c8f0ff6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -16,8 +16,13 @@
*/
package org.apache.rocketmq.broker.metrics;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
@@ -211,20 +216,43 @@ public class ConsumerLagCalculator {
}
public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
+
+ List<CompletableFuture<CalculateLagResult>> futures = new
ArrayList<>();
+
+ BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+ CompletableFuture<ConsumerLagCalculator.CalculateLagResult>>
biConsumer =
+ (info, future) -> calculate(info, future::complete);
+
processAllGroup(info -> {
if (info.group == null || info.topic == null) {
return;
}
-
+ CompletableFuture<CalculateLagResult> future = new
CompletableFuture<>();
if (info.isPop &&
brokerConfig.isEnableNotifyBeforePopCalculateLag()) {
if (popLongPollingService.notifyMessageArriving(info.topic,
-1, info.group,
- true, null, 0, null, null, new
PopCommandCallback(this::calculate, info, lagRecorder))) {
+ true, null, 0, null, null,
+ new PopCommandCallback(biConsumer, info, future))) {
+ futures.add(future);
return;
}
}
-
calculate(info, lagRecorder);
});
+
+ // Set the maximum wait time to 10 seconds to avoid indefinite blocking
+ // in case of a fast fail that causes the future to not complete its
execution.
+ try {
+ CompletableFuture.allOf(futures.toArray(
+ new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
+
+ futures.forEach(future -> {
+ if (future.isDone() && !future.isCompletedExceptionally()) {
+ lagRecorder.accept(future.join());
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.error("Calculate lag timeout after 10 seconds", e);
+ }
}
public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult>
lagRecorder) {