This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0e9c30a386 Adding gauges for upstream and consuming offsets (#14600)
0e9c30a386 is described below
commit 0e9c30a386ec8a80c38df4a74c2beed4a757a1e4
Author: soumitra-st <[email protected]>
AuthorDate: Fri Dec 20 21:26:26 2024 -0800
Adding gauges for upstream and consuming offsets (#14600)
---
.../apache/pinot/common/metrics/ServerGauge.java | 2 +
.../prometheus/ServerPrometheusMetricsTest.java | 3 +-
.../manager/realtime/IngestionDelayTracker.java | 46 ++++++++++++++++++++++
.../realtime/IngestionDelayTrackerTest.java | 10 ++++-
4 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b999e7b8e4..7c1826582a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -77,6 +77,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount",
false),
REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
+ REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
+ REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
REALTIME_CONSUMER_DIR_USAGE("bytes", true);
private final String _gaugeName;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 7ff5a99b87..4a4a3c1440 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -56,7 +56,8 @@ public abstract class ServerPrometheusMetricsTest extends
PinotPrometheusMetrics
List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT,
ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
ServerGauge.REALTIME_INGESTION_DELAY_MS,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
- ServerGauge.DEDUP_PRIMARY_KEYS_COUNT);
+ ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
+ ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED,
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 048f7564b1..2b52b29f2d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -210,6 +210,8 @@ public class IngestionDelayTracker {
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
}
return null;
});
@@ -289,6 +291,16 @@ public class IngestionDelayTracker {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
() -> getPartitionIngestionOffsetLag(partitionId));
}
+
+ if (currentOffset != null) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+ ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () ->
getPartitionIngestionConsumingOffset(partitionId));
+ }
+
+ if (latestOffset != null) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+ ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () ->
getPartitionIngestionUpstreamOffset(partitionId));
+ }
}
return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset, latestOffset);
});
@@ -416,6 +428,40 @@ public class IngestionDelayTracker {
return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset)
currentOffset).getOffset();
}
+ // Get the consuming offset for a given partition
+ public long getPartitionIngestionConsumingOffset(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ if (ingestionInfo == null) {
+ return 0;
+ }
+ StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+ if (currentOffset == null) {
+ return 0;
+ }
+ // TODO: Support other types of offsets
+ if (!(currentOffset instanceof LongMsgOffset)) {
+ return 0;
+ }
+ return ((LongMsgOffset) currentOffset).getOffset();
+ }
+
+ // Get the latest offset in upstream data source for a given partition
+ public long getPartitionIngestionUpstreamOffset(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ if (ingestionInfo == null) {
+ return 0;
+ }
+ StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+ if (latestOffset == null) {
+ return 0;
+ }
+ // TODO: Support other types of offsets
+ if (!(latestOffset instanceof LongMsgOffset)) {
+ return 0;
+ }
+ return ((LongMsgOffset) latestOffset).getOffset();
+ }
+
/*
* We use this method to clean up when a table is being removed. No updates
are expected at this time as all
* RealtimeSegmentManagers should be down now.
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 9cb527b121..1fdd12e00e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -307,11 +307,13 @@ public class IngestionDelayTrackerTest {
IngestionDelayTracker ingestionDelayTracker = createTracker();
// Test tracking offset lag for a single partition
- StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
- StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
+ StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(50);
+ StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(150);
ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
latestOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
100);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
150);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
50);
// Test tracking offset lag for another partition
StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
@@ -319,6 +321,8 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
latestOffset1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
100);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition1),
150);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition1),
50);
// Update offset lag for partition0
msgOffset0 = new LongMsgOffset(150);
@@ -326,6 +330,8 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
latestOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
50);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
200);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
150);
ingestionDelayTracker.shutdown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]