This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e984023d31 [ISSUE #9816] Fix calculate consumer lag with opentelemetry 
(#9873)
e984023d31 is described below

commit e984023d31ee31d62d2d17d8e7e1e2bba58071ae
Author: lizhimins <[email protected]>
AuthorDate: Tue Nov 25 19:52:21 2025 +0800

    [ISSUE #9816] Fix calculate consumer lag with opentelemetry (#9873)
---
 .../broker/longpolling/PopCommandCallback.java     | 16 +++++-----
 .../broker/metrics/BrokerMetricsManager.java       | 10 ++-----
 .../broker/metrics/ConsumerLagCalculator.java      | 34 ++++++++++++++++++++--
 3 files changed, 40 insertions(+), 20 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
index 2e190e20f9..ef541a0678 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
@@ -17,33 +17,31 @@
 
 package org.apache.rocketmq.broker.longpolling;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
 import org.apache.rocketmq.remoting.CommandCallback;
 
 public class PopCommandCallback implements CommandCallback {
 
     private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
-        Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
-
+        CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> 
biConsumer;
     private final ConsumerLagCalculator.ProcessGroupInfo info;
-    private final Consumer<ConsumerLagCalculator.CalculateLagResult> 
lagRecorder;
-
+    private final CompletableFuture<ConsumerLagCalculator.CalculateLagResult> 
future;
 
     public PopCommandCallback(
         BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
-                    Consumer<ConsumerLagCalculator.CalculateLagResult>> 
biConsumer,
+            CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> 
biConsumer,
         ConsumerLagCalculator.ProcessGroupInfo info,
-        Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) {
+        CompletableFuture<ConsumerLagCalculator.CalculateLagResult> future) {
 
         this.biConsumer = biConsumer;
         this.info = info;
-        this.lagRecorder = lagRecorder;
+        this.future = future;
     }
 
     @Override
     public void accept() {
-        biConsumer.accept(info, lagRecorder);
+        biConsumer.accept(info, future);
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 960c1dd250..fe6c180e45 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -673,14 +673,8 @@ public class BrokerMetricsManager {
         consumerLagMessages = 
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
             .setDescription("Consumer lag messages")
             .ofLongs()
-            .buildWithCallback(measurement ->
-                consumerLagCalculator.calculateLag(result -> {
-                    // Note: 'record' method uses HashMap which may cause
-                    // concurrent access issues when Pull thread executes Pop 
callbacks.
-                    synchronized (this) {
-                        measurement.record(result.lag, 
buildLagAttributes(result));
-                    }
-                }));
+            .buildWithCallback(measurement -> 
consumerLagCalculator.calculateLag(result ->
+                measurement.record(result.lag, buildLagAttributes(result))));
 
         consumerLagLatency = 
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_LATENCY)
             .setDescription("Consumer lag time")
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 35519c1d1c..d42c8f0ff6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -16,8 +16,13 @@
  */
 package org.apache.rocketmq.broker.metrics;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.BrokerController;
@@ -211,20 +216,43 @@ public class ConsumerLagCalculator {
     }
 
     public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
+
+        List<CompletableFuture<CalculateLagResult>> futures = new 
ArrayList<>();
+
+        BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+            CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> 
biConsumer =
+                (info, future) -> calculate(info, future::complete);
+
         processAllGroup(info -> {
             if (info.group == null || info.topic == null) {
                 return;
             }
-
+            CompletableFuture<CalculateLagResult> future = new 
CompletableFuture<>();
             if (info.isPop && 
brokerConfig.isEnableNotifyBeforePopCalculateLag()) {
                 if (popLongPollingService.notifyMessageArriving(info.topic, 
-1, info.group,
-                    true, null, 0, null, null, new 
PopCommandCallback(this::calculate, info, lagRecorder))) {
+                    true, null, 0, null, null,
+                    new PopCommandCallback(biConsumer, info, future))) {
+                    futures.add(future);
                     return;
                 }
             }
-
             calculate(info, lagRecorder);
         });
+
+        // Set the maximum wait time to 10 seconds to avoid indefinite blocking
+        // in case of a fast fail that causes the future to not complete its 
execution.
+        try {
+            CompletableFuture.allOf(futures.toArray(
+                new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
+
+            futures.forEach(future -> {
+                if (future.isDone() && !future.isCompletedExceptionally()) {
+                    lagRecorder.accept(future.join());
+                }
+            });
+        } catch (Exception e) {
+            LOGGER.error("Calculate lag timeout after 10 seconds", e);
+        }
     }
 
     public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult> 
lagRecorder) {

Reply via email to