This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 44a7b8f82c4 Removes ingestion metrics for consuming -> offline
transition (#16657)
44a7b8f82c4 is described below
commit 44a7b8f82c4a8a0bc59c22c4551b6cf9e4f67737
Author: NOOB <[email protected]>
AuthorDate: Thu Aug 28 16:03:15 2025 +0530
Removes ingestion metrics for consuming -> offline transition (#16657)
---
.../data/manager/realtime/RealtimeTableDataManager.java | 13 +++++++++++++
.../pinot/segment/local/data/manager/TableDataManager.java | 8 ++++++++
.../helix/SegmentOnlineOfflineStateModelFactory.java | 12 ++++++++++++
3 files changed, 33 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ed80b37abe6..406712e9d68 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -335,6 +335,19 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new
LLCSegmentName(segmentName).getPartitionGroupId());
}
+ /**
+ * Method to handle CONSUMING -> OFFLINE segment state transitions:
+ * We stop tracking partitions whose segments are going OFFLINE. The reason
is that offline segments are not queried.
+ * So ingestion delay for the offline replicas are not relevant. If there
are more replicas with offline state,
+ * replica up metric will determine the severity of the issue.
+ *
+ * @param segmentName name of segment for which the state change is being
handled
+ */
+ @Override
+ public void onConsumingToOffline(String segmentName) {
+ _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+ }
+
@Override
public List<SegmentContext> getSegmentContexts(List<IndexSegment>
selectedSegments,
Map<String, String> queryOptions) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 34b73b9ffb6..c886a1f3085 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -336,6 +336,14 @@ public interface TableDataManager {
default void onConsumingToDropped(String segmentNameStr) {
}
+ /**
+ * Interface to handle segment state transitions from CONSUMING to OFFLINE
+ *
+ * @param segmentNameStr name of segment for which the state change is being
handled
+ */
+ default void onConsumingToOffline(String segmentNameStr) {
+ }
+
/**
* Return list of segment names that are stale along with reason.
*
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index ea9926812ab..3f59bf9fcce 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -115,6 +115,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
String segmentName = message.getPartitionName();
_instanceDataManager.offloadSegment(realtimeTableName, segmentName);
_recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName,
segmentName), true);
+ onConsumingToOffline(realtimeTableName, segmentName);
} catch (Exception e) {
_logger.error(
"Caught exception while processing
SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() for "
@@ -124,6 +125,17 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
}
}
+ private void onConsumingToOffline(String realtimeTableName, String
segmentName) {
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
+ if (tableDataManager == null) {
+ _logger.warn(
+ "Failed to find data manager for table: {}, skip invoking
consuming to offline callback for segment: {}",
+ realtimeTableName, segmentName);
+ return;
+ }
+ tableDataManager.onConsumingToOffline(segmentName);
+ }
+
@Transition(from = "CONSUMING", to = "DROPPED")
public void onBecomeDroppedFromConsuming(Message message,
NotificationContext context)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]