This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aa6a6738536 Adds metric to emit consumer sempahore acquire time
(#16278)
aa6a6738536 is described below
commit aa6a67385363d44e97596f80edb4c000f5767218
Author: NOOB <[email protected]>
AuthorDate: Mon Jul 7 03:25:35 2025 +0530
Adds metric to emit consumer sempahore acquire time (#16278)
---
.../org/apache/pinot/common/metrics/ServerGauge.java | 4 +++-
.../core/data/manager/realtime/ConsumerCoordinator.java | 17 +++++++++++++----
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 5d7d8d03d0a..151f054f28e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -124,7 +124,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
// how many message are there in the server's message queue in helix
HELIX_MESSAGES_COUNT("count", true),
STARTUP_STATUS_CHECK_IN_PROGRESS("state", true,
- "Indicates whether the server startup status check is currently in
progress");
+ "Indicates whether the server startup status check is currently in
progress"),
+ CONSUMER_LOCK_WAIT_TIME_MS("milliseconds", false,
+ "Indicates the time consumer spends while waiting on the consumer
lock.");
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
index d72d6afe7ef..cc99a15e167 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -94,10 +95,18 @@ public class ConsumerCoordinator {
}
long startTimeMs = System.currentTimeMillis();
- while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
- LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in:
{}ms. Retrying.", segmentName,
- System.currentTimeMillis() - startTimeMs);
- checkSegmentStatus(segmentName);
+ try {
+ while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+ checkSegmentStatus(segmentName);
+ long waitTimeMs = System.currentTimeMillis() - startTimeMs;
+ LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in:
{}ms. Retrying.", segmentName,
+ waitTimeMs);
+
_serverMetrics.setValueOfPartitionGauge(_realtimeTableDataManager.getTableName(),
+ llcSegmentName.getPartitionGroupId(),
ServerGauge.CONSUMER_LOCK_WAIT_TIME_MS, waitTimeMs);
+ }
+ } finally {
+
_serverMetrics.setValueOfPartitionGauge(_realtimeTableDataManager.getTableName(),
+ llcSegmentName.getPartitionGroupId(),
ServerGauge.CONSUMER_LOCK_WAIT_TIME_MS, 0);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]