This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c7fd6f333f Continue Freshness check if latest stream offset fetch 
fails (#17563)
4c7fd6f333f is described below

commit 4c7fd6f333f0159680bf9d97abb75a47ebd2e9ef
Author: NOOB <[email protected]>
AuthorDate: Fri Jan 23 20:12:28 2026 +0530

    Continue Freshness check if latest stream offset fetch fails (#17563)
    
    * Continue Freshness check if latest stream offset fetch fails
    
    * Adds nullable annotation
    
    * fixes test
    
    * fixes lint
---
 .../FreshnessBasedConsumptionStatusChecker.java    | 15 +++--
 .../IngestionBasedConsumptionStatusChecker.java    |  5 +-
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 68 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 6 deletions(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 13dd8075841..5a02a91f2dd 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -78,11 +78,18 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     // the stream consumer to check partition count if we're already caught up.
     StreamPartitionMsgOffset currentOffset = 
rtSegmentDataManager.getCurrentOffset();
 
-    StreamMetadataProvider streamMetadataProvider =
-        
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
-    StreamPartitionMsgOffset latestStreamOffset =
-        
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager, 
streamMetadataProvider);
+    StreamPartitionMsgOffset latestStreamOffset = null;
+    try {
+      StreamMetadataProvider streamMetadataProvider =
+          
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+      latestStreamOffset =
+          
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager, 
streamMetadataProvider);
+    } catch (Exception e) {
+      _logger.warn("Failed to fetch latest stream offset for segment: {}. Will 
continue with other checks.",
+          segmentName, e);
+    }
 
+    // Check if we're caught up (isOffsetCaughtUp handles null 
latestStreamOffset by returning false)
     if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
       _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
               + "But the current ingested offset is equal to the latest 
available offset {}.", segmentName, freshnessMs,
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index 56ed05920e3..ef389f645ef 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -139,8 +140,8 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
   protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
       RealtimeTableDataManager realtimeTableDataManager);
 
-  protected boolean isOffsetCaughtUp(String segmentName,
-      StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset 
latestOffset) {
+  protected boolean isOffsetCaughtUp(String segmentName, @Nullable 
StreamPartitionMsgOffset currentOffset,
+      @Nullable StreamPartitionMsgOffset latestOffset) {
     if (currentOffset != null && latestOffset != null) {
       // Kafka's "latest" offset is actually the next available offset. 
Therefore it will be 1 ahead of the
       // current offset in the case we are caught up.
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 40b3a40e2e9..0953045bc80 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -582,4 +583,71 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
+
+  @Test
+  public void testTimeoutExceptionWhenFetchingLatestStreamOffset() {
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    long idleTimeoutMs = 10L;
+    FreshnessBasedConsumptionStatusChecker statusChecker =
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs, 100L);
+
+    // TableDataManager is not set up yet
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 2);
+
+    // setup TableDataManager
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+
+    // setup SegmentDataManagers
+    RealtimeSegmentDataManager segMngrA0 = 
mock(RealtimeSegmentDataManager.class);
+    RealtimeSegmentDataManager segMngrA1 = 
mock(RealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+
+    // segA0 provider throws RuntimeException - this should be caught and 
handled gracefully
+    // In practice, RealtimeSegmentMetadataUtils wraps TimeoutException in 
RuntimeException
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenThrow(
+        new RuntimeException("Failed to fetch latest stream offset for 
segment: " + segA0,
+            new TimeoutException("Timeout fetching latest stream offset")));
+    // segA1 provider works normally
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    // ensure negative values are ignored
+    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+    setupLatestIngestionTimestamp(segMngrA1, Long.MIN_VALUE);
+
+    // segA0 has idle time below threshold, segA1 has idle time above threshold
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
- 1);
+    when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
+ 1);
+
+    // segA0: timeout exception when fetching latest offset, but idle time is 
below threshold
+    //         - should not be caught up (can't determine from offset, and 
idle time not exceeded)
+    // segA1: can fetch latest offset (10 < 20), but idle time exceeds 
threshold
+    //         - should be caught up due to idle timeout
+    // Expected: 1 segment not caught up (segA0)
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
+
+    // Now make segA0 also exceed idle timeout - it should be caught up 
despite timeout exception
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
+ 1);
+    // Expected: 0 segments not caught up (both exceed idle timeout)
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to