This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3ebbfca reenable highest stream offset metrics (#8372)
3ebbfca is described below
commit 3ebbfcad987683b61aa23c2ee353f44db154a5a7
Author: Rong Rong <[email protected]>
AuthorDate: Thu Mar 24 11:04:17 2022 -0700
reenable highest stream offset metrics (#8372)
---
.../main/java/org/apache/pinot/common/metrics/ServerGauge.java | 3 ---
.../data/manager/realtime/LLRealtimeSegmentDataManager.java | 10 ++++++----
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index d9b9c5d..bf5e973 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -29,9 +29,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
DOCUMENT_COUNT("documents", false),
SEGMENT_COUNT("segments", false),
LLC_PARTITION_CONSUMING("state", false),
- HIGHEST_KAFKA_OFFSET_CONSUMED("messages", false),
- // Introducing a new stream agnostic metric to replace
HIGHEST_KAFKA_OFFSET_CONSUMED.
- // We can phase out HIGHEST_KAFKA_OFFSET_CONSUMED once we have collected
sufficient metrics for the new one
HIGHEST_STREAM_OFFSET_CONSUMED("messages", false),
LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS("seconds", false),
LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS("seconds", false),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0245099..7402b53 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -68,6 +68,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -419,10 +420,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
consecutiveIdleCount = 0;
// We consumed something. Update the highest stream offset as well as
partition-consuming metric.
// TODO Issue 5359 Need to find a way to bump metrics without getting
actual offset value.
- //_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
- //_currentOffset.getOffset());
- //_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
- //_currentOffset.getOffset());
+ if (_currentOffset instanceof LongMsgOffset) {
+ // TODO: only LongMsgOffset supplies long offset value.
+ _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+ ((LongMsgOffset) _currentOffset).getOffset());
+ }
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]