This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ebbfca  reenable highest stream offset metrics (#8372)
3ebbfca is described below

commit 3ebbfcad987683b61aa23c2ee353f44db154a5a7
Author: Rong Rong <[email protected]>
AuthorDate: Thu Mar 24 11:04:17 2022 -0700

    reenable highest stream offset metrics (#8372)
---
 .../main/java/org/apache/pinot/common/metrics/ServerGauge.java |  3 ---
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java    | 10 ++++++----
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index d9b9c5d..bf5e973 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -29,9 +29,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   DOCUMENT_COUNT("documents", false),
   SEGMENT_COUNT("segments", false),
   LLC_PARTITION_CONSUMING("state", false),
-  HIGHEST_KAFKA_OFFSET_CONSUMED("messages", false),
-  // Introducing a new stream agnostic metric to replace 
HIGHEST_KAFKA_OFFSET_CONSUMED.
-  // We can phase out HIGHEST_KAFKA_OFFSET_CONSUMED once we have collected 
sufficient metrics for the new one
   HIGHEST_STREAM_OFFSET_CONSUMED("messages", false),
   LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS("seconds", false),
   LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS("seconds", false),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0245099..7402b53 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -68,6 +68,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -419,10 +420,11 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         consecutiveIdleCount = 0;
         // We consumed something. Update the highest stream offset as well as 
partition-consuming metric.
         // TODO Issue 5359 Need to find a way to bump metrics without getting 
actual offset value.
-        //_serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
-        //_currentOffset.getOffset());
-        //_serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
-        //_currentOffset.getOffset());
+        if (_currentOffset instanceof LongMsgOffset) {
+          // TODO: only LongMsgOffset supplies long offset value.
+          _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+              ((LongMsgOffset) _currentOffset).getOffset());
+        }
         _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);
       } else if (messageBatch.getUnfilteredMessageCount() > 0) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to