This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2886e67434 adjust the llc partition consuming metric reporting logic
(#12627)
2886e67434 is described below
commit 2886e67434d036f8c920a75f1d76d646ce2d2cdc
Author: Haitao Zhang <[email protected]>
AuthorDate: Mon Mar 11 17:12:54 2024 -0700
adjust the llc partition consuming metric reporting logic (#12627)
---
.../pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 49cddb5574..9f9a457683 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -421,6 +421,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.info("Starting consumption loop start offset {},
finalOffset {}", _currentOffset, _finalOffset);
while (!_shouldStop && !endCriteriaReached()) {
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
// Consume for the next readTime ms, or we get to final offset,
whichever happens earlier,
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
@@ -472,7 +473,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
((LongMsgOffset) _currentOffset).getOffset());
}
- _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
} else if (endCriteriaReached) {
// At this point current offset has not moved because
processStreamEvents() has exited before processing a
@@ -497,9 +497,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
long timeSinceStreamLastCreatedOrConsumedMs =
_idleTimer.getTimeSinceStreamLastCreatedOrConsumedMs();
if (idleTimeoutMillis >= 0 && (timeSinceStreamLastCreatedOrConsumedMs
> idleTimeoutMillis)) {
- // Update the partition-consuming metric only if we have been idling
beyond idle timeout.
// Create a new stream consumer wrapper, in case we are stuck on
something.
- _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
recreateStreamConsumer(
String.format("Total idle time: %d ms exceeded idle timeout: %d
ms",
timeSinceStreamLastCreatedOrConsumedMs, idleTimeoutMillis));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]