noob-se7en commented on code in PR #17598:
URL: https://github.com/apache/pinot/pull/17598#discussion_r2755815696


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java:
##########
@@ -48,32 +50,29 @@ public 
FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataMa
     super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
     _minFreshnessMs = minFreshnessMs;
     _idleTimeoutMs = idleTimeoutMs;
+    _logger.info("FreshnessBasedConsumptionStatusChecker initialized with 
min_freshness={}ms, idle_timeout={}ms",
+        _minFreshnessMs, _idleTimeoutMs);
   }
 
   private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
     return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
   }
 
-  protected long now() {
-    return System.currentTimeMillis();
-  }
-
   @Override
   protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
       RealtimeTableDataManager realtimeTableDataManager) {
-    long now = now();
-    long latestIngestionTimestamp =
-        
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
-    long freshnessMs = now - latestIngestionTimestamp;
+    SegmentMetadata segmentMetadata = 
rtSegmentDataManager.getSegment().getSegmentMetadata();
+    long minimumIngestionLagMs = segmentMetadata.getMinimumIngestionLagMs();
 
-    // We check latestIngestionTimestamp >= 0 because the default freshness 
when unknown is Long.MIN_VALUE
-    if (latestIngestionTimestamp >= 0 && freshnessMs <= _minFreshnessMs) {
-      _logger.info("Segment {} with freshness {}ms has caught up within min 
freshness {}", segmentName, freshnessMs,
-          _minFreshnessMs);
+    // Simple freshness check - if minimum lag ever seen is within threshold, 
we're caught up.
+    // Note: default value is Long.MAX_VALUE when unknown, which will 
correctly fail this check.

Review Comment:
   Regarding the default value, Is the record timestamp always present in all 
the streams Pinot supports?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to