This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aa6a6738536 Adds metric to emit consumer sempahore acquire time 
(#16278)
aa6a6738536 is described below

commit aa6a67385363d44e97596f80edb4c000f5767218
Author: NOOB <[email protected]>
AuthorDate: Mon Jul 7 03:25:35 2025 +0530

    Adds metric to emit consumer sempahore acquire time (#16278)
---
 .../org/apache/pinot/common/metrics/ServerGauge.java    |  4 +++-
 .../core/data/manager/realtime/ConsumerCoordinator.java | 17 +++++++++++++----
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 5d7d8d03d0a..151f054f28e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -124,7 +124,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // how many message are there in the server's message queue in helix
   HELIX_MESSAGES_COUNT("count", true),
   STARTUP_STATUS_CHECK_IN_PROGRESS("state", true,
-      "Indicates whether the server startup status check is currently in 
progress");
+      "Indicates whether the server startup status check is currently in 
progress"),
+  CONSUMER_LOCK_WAIT_TIME_MS("milliseconds", false,
+      "Indicates the time consumer spends while waiting on the consumer 
lock.");
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
index d72d6afe7ef..cc99a15e167 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -94,10 +95,18 @@ public class ConsumerCoordinator {
     }
 
     long startTimeMs = System.currentTimeMillis();
-    while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
-      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", segmentName,
-          System.currentTimeMillis() - startTimeMs);
-      checkSegmentStatus(segmentName);
+    try {
+      while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+        checkSegmentStatus(segmentName);
+        long waitTimeMs = System.currentTimeMillis() - startTimeMs;
+        LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", segmentName,
+            waitTimeMs);
+        
_serverMetrics.setValueOfPartitionGauge(_realtimeTableDataManager.getTableName(),
+            llcSegmentName.getPartitionGroupId(), 
ServerGauge.CONSUMER_LOCK_WAIT_TIME_MS, waitTimeMs);
+      }
+    } finally {
+      
_serverMetrics.setValueOfPartitionGauge(_realtimeTableDataManager.getTableName(),
+          llcSegmentName.getPartitionGroupId(), 
ServerGauge.CONSUMER_LOCK_WAIT_TIME_MS, 0);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to