KKcorps commented on code in PR #13668:
URL: https://github.com/apache/pinot/pull/13668#discussion_r1690763064
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1029,6 +1039,47 @@ void
updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>>
}
}
+ /**
+ * Handles segment movement between instances.
+ * If the new consuming segment is served by a different set of servers than
the committed segment, notify the
+ * servers no longer serving the stream partition to remove the ingestion
metrics. This can prevent servers from
+ * emitting high ingestion delay alerts on stream partitions no longer
served.
+ */
+ private void handleSegmentMovement(String realtimeTableName, Map<String,
Map<String, String>> instanceStatesMap,
+ String committedSegment, String newConsumingSegment) {
+ Set<String> oldInstances =
instanceStatesMap.get(committedSegment).keySet();
+ Set<String> newInstances =
instanceStatesMap.get(newConsumingSegment).keySet();
+ if (newInstances.containsAll(oldInstances)) {
+ return;
+ }
+ Set<String> instancesNoLongerServe = new HashSet<>(oldInstances);
+ instancesNoLongerServe.removeAll(newInstances);
+ LOGGER.info("Segment movement detected for committed segment: {} (served
by: {}), "
+ + "consuming segment: {} (served by: {}) in table: {}, "
+ + "sending message to instances: {} to remove ingestion metrics",
committedSegment, oldInstances,
+ newConsumingSegment, newInstances, realtimeTableName,
instancesNoLongerServe);
+
+ ClusterMessagingService messagingService =
_helixManager.getMessagingService();
+ List<String> instancesSent = new
ArrayList<>(instancesNoLongerServe.size());
+ for (String instance : instancesNoLongerServe) {
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setInstanceName(instance);
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setResource(realtimeTableName);
+ recipientCriteria.setPartition(committedSegment);
+ recipientCriteria.setSessionSpecific(true);
+ IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
Review Comment:
I feel that we should rename `IngestionMetricsRemoveMessage` to something
else so that it can be reused for other purposes as well in future.
e.g. `RemoveInstanceMessage`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]