This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e9c30a386 Adding gauges for upstream and consuming offsets (#14600)
0e9c30a386 is described below

commit 0e9c30a386ec8a80c38df4a74c2beed4a757a1e4
Author: soumitra-st <[email protected]>
AuthorDate: Fri Dec 20 21:26:26 2024 -0800

    Adding gauges for upstream and consuming offsets (#14600)
---
 .../apache/pinot/common/metrics/ServerGauge.java   |  2 +
 .../prometheus/ServerPrometheusMetricsTest.java    |  3 +-
 .../manager/realtime/IngestionDelayTracker.java    | 46 ++++++++++++++++++++++
 .../realtime/IngestionDelayTrackerTest.java        | 10 ++++-
 4 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b999e7b8e4..7c1826582a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -77,6 +77,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
   UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", 
false),
   REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
+  REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
+  REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
   REALTIME_CONSUMER_DIR_USAGE("bytes", true);
 
   private final String _gaugeName;
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 7ff5a99b87..4a4a3c1440 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -56,7 +56,8 @@ public abstract class ServerPrometheusMetricsTest extends 
PinotPrometheusMetrics
       List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, 
ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
           ServerGauge.REALTIME_INGESTION_OFFSET_LAG, 
ServerGauge.REALTIME_INGESTION_DELAY_MS,
           ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, 
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
-          ServerGauge.DEDUP_PRIMARY_KEYS_COUNT);
+          ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
+          ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
 
   private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
       List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, 
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 048f7564b1..2b52b29f2d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -210,6 +210,8 @@ public class IngestionDelayTracker {
         _serverMetrics.removePartitionGauge(_metricName, partitionId,
             ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
         _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
+        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
       }
       return null;
     });
@@ -289,6 +291,16 @@ public class IngestionDelayTracker {
           _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
               () -> getPartitionIngestionOffsetLag(partitionId));
         }
+
+        if (currentOffset != null) {
+          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+              ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> 
getPartitionIngestionConsumingOffset(partitionId));
+        }
+
+        if (latestOffset != null) {
+          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+              ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> 
getPartitionIngestionUpstreamOffset(partitionId));
+        }
       }
       return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, 
currentOffset, latestOffset);
     });
@@ -416,6 +428,40 @@ public class IngestionDelayTracker {
     return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
   }
 
+  // Get the consuming offset for a given partition
+  public long getPartitionIngestionConsumingOffset(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    if (ingestionInfo == null) {
+      return 0;
+    }
+    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+    if (currentOffset == null) {
+      return 0;
+    }
+    // TODO: Support other types of offsets
+    if (!(currentOffset instanceof LongMsgOffset)) {
+      return 0;
+    }
+    return ((LongMsgOffset) currentOffset).getOffset();
+  }
+
+  // Get the latest offset in upstream data source for a given partition
+  public long getPartitionIngestionUpstreamOffset(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    if (ingestionInfo == null) {
+      return 0;
+    }
+    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+    if (latestOffset == null) {
+      return 0;
+    }
+    // TODO: Support other types of offsets
+    if (!(latestOffset instanceof LongMsgOffset)) {
+      return 0;
+    }
+    return ((LongMsgOffset) latestOffset).getOffset();
+  }
+
   /*
    * We use this method to clean up when a table is being removed. No updates 
are expected at this time as all
    * RealtimeSegmentManagers should be down now.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 9cb527b121..1fdd12e00e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -307,11 +307,13 @@ public class IngestionDelayTrackerTest {
     IngestionDelayTracker ingestionDelayTracker = createTracker();
 
     // Test tracking offset lag for a single partition
-    StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
-    StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
+    StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(50);
+    StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(150);
     ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
         latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 100);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
 150);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
 50);
 
     // Test tracking offset lag for another partition
     StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
@@ -319,6 +321,8 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
         latestOffset1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
 100);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition1),
 150);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition1),
 50);
 
     // Update offset lag for partition0
     msgOffset0 = new LongMsgOffset(150);
@@ -326,6 +330,8 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
         latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 50);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
 200);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
 150);
 
     ingestionDelayTracker.shutdown();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to