This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new afd5530230 allow passing freshness checker after an idle threshold 
(#11345)
afd5530230 is described below

commit afd5530230cb73592a65ab8990148fc405d77a9e
Author: Johan Adami <[email protected]>
AuthorDate: Tue Aug 22 02:41:20 2023 -0400

    allow passing freshness checker after an idle threshold (#11345)
---
 .../core/data/manager/realtime/IdleTimer.java      |  71 +++++++++++
 .../realtime/LLRealtimeSegmentDataManager.java     |  53 ++++----
 .../core/data/manager/realtime/IdleTimerTest.java  | 123 ++++++++++++++++++
 .../server/starter/helix/BaseServerStarter.java    |   8 +-
 .../FreshnessBasedConsumptionStatusChecker.java    |  26 +++-
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 137 +++++++++++++++++++--
 .../apache/pinot/spi/utils/CommonConstants.java    |  12 ++
 7 files changed, 394 insertions(+), 36 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
new file mode 100644
index 0000000000..bb44bdaae1
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+/**
+ * The IdleTimer is responsible for keeping track of 2 different idle times:
+ *  - The stream idle time which resets every time we remake the stream 
consumer.
+ *    This depends on the user configured "idle.timeout.millis" stream config.
+ *  - the total idle time which only resets when we consume something.
+ */
+public class IdleTimer {
+
+  private volatile long _timeWhenStreamLastCreatedOrConsumedMs = 0;
+  private volatile long _timeWhenEventLastConsumedMs = 0;
+
+  public IdleTimer() {
+  }
+
+  protected long now() {
+    return System.currentTimeMillis();
+  }
+
+  public void init() {
+    long nowMs = now();
+    // When an event is consumed, we consider the stream no longer idle.
+    // Event consumption idleness, should always be greater than stream
+    // idleness since we recreate the stream after some amount of idleness,
+    // but that does not guarantee we'll consume an event.
+    _timeWhenStreamLastCreatedOrConsumedMs = nowMs;
+    _timeWhenEventLastConsumedMs = nowMs;
+  }
+
+  public void markStreamCreated() {
+    _timeWhenStreamLastCreatedOrConsumedMs = now();
+  }
+
+  public void markEventConsumed() {
+    init();
+  }
+
+  public long getTimeSinceStreamLastCreatedOrConsumedMs() {
+    if (_timeWhenStreamLastCreatedOrConsumedMs == 0) {
+      return 0;
+    }
+    return now() - _timeWhenStreamLastCreatedOrConsumedMs;
+  }
+
+  public long getTimeSinceEventLastConsumedMs() {
+    if (_timeWhenEventLastConsumedMs == 0) {
+      return 0;
+    }
+    return now() - _timeWhenEventLastConsumedMs;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index b2c9bdcfaf..4acab57731 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -242,6 +242,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private volatile int _numRowsErrored = 0;
   private volatile int _consecutiveErrorCount = 0;
   private long _startTimeMs = 0;
+  private final IdleTimer _idleTimer = new IdleTimer();
   private final String _segmentNameStr;
   private final SegmentVersion _segmentVersion;
   private final SegmentBuildTimeLeaseExtender _leaseExtender;
@@ -404,8 +405,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _numRowsErrored = 0;
     final long idlePipeSleepTimeMillis = 100;
     final long idleTimeoutMillis = 
_partitionLevelStreamConfig.getIdleTimeoutMillis();
-    long idleStartTimeMillis = -1;
-    boolean idle = false;
+    _idleTimer.init();
 
     StreamPartitionMsgOffset lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
@@ -443,7 +443,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       boolean endCriteriaReached = processStreamEvents(messageBatch, 
idlePipeSleepTimeMillis);
 
       if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
-        idle = false;
+        _idleTimer.markEventConsumed();
         // We consumed something. Update the highest stream offset as well as 
partition-consuming metric.
         // TODO Issue 5359 Need to find a way to bump metrics without getting 
actual offset value.
         if (_currentOffset instanceof LongMsgOffset) {
@@ -462,7 +462,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         }
         // We check this flag again further down
       } else if (messageBatch.getUnfilteredMessageCount() > 0) {
-        idle = false;
+        _idleTimer.markEventConsumed();
         // we consumed something from the stream but filtered all the content 
out,
         // so we need to advance the offsets to avoid getting stuck
         StreamPartitionMsgOffset nextOffset = 
messageBatch.getOffsetOfNextBatch();
@@ -473,21 +473,16 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(nextOffset);
       } else {
         // We did not consume any rows.
-        if (!idle) {
-          idleStartTimeMillis = now();
-          idle = true;
-        }
-        if (idleTimeoutMillis >= 0) {
-          long totalIdleTimeMillis = now() - idleStartTimeMillis;
-          if (totalIdleTimeMillis > idleTimeoutMillis) {
-            // Update the partition-consuming metric only if we have been 
idling beyond idle timeout.
-            // Create a new stream consumer wrapper, in case we are stuck on 
something.
-            _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
-            recreateStreamConsumer(
-                String.format("Total idle time: %d ms exceeded idle timeout: 
%d ms", totalIdleTimeMillis,
-                    idleTimeoutMillis));
-            idle = false;
-          }
+        long timeSinceStreamLastCreatedOrConsumedMs = 
_idleTimer.getTimeSinceStreamLastCreatedOrConsumedMs();
+
+        if (idleTimeoutMillis >= 0 && (timeSinceStreamLastCreatedOrConsumedMs 
> idleTimeoutMillis)) {
+          // Update the partition-consuming metric only if we have been idling 
beyond idle timeout.
+          // Create a new stream consumer wrapper, in case we are stuck on 
something.
+          _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
+          recreateStreamConsumer(
+              String.format("Total idle time: %d ms exceeded idle timeout: %d 
ms",
+                  timeSinceStreamLastCreatedOrConsumedMs, idleTimeoutMillis));
+          _idleTimer.markStreamCreated();
         }
       }
 
@@ -1507,17 +1502,29 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     }
   }
 
+  public long getTimeSinceEventLastConsumedMs() {
+    return _idleTimer.getTimeSinceEventLastConsumedMs();
+  }
+
   public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
+    return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 
maxWaitTimeMs);
+  }
+
+  public StreamPartitionMsgOffset fetchEarliestStreamOffset(long 
maxWaitTimeMs) {
+    return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 
maxWaitTimeMs);
+  }
+
+  private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria 
offsetCriteria, long maxWaitTimeMs) {
     if (_partitionMetadataProvider == null) {
       createPartitionMetadataProvider("Fetch latest stream offset");
     }
     try {
-      return 
_partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
-          maxWaitTimeMs);
+      return 
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, 
maxWaitTimeMs);
     } catch (Exception e) {
       _segmentLogger.warn(
-          "Cannot fetch latest stream offset for clientId {} and 
partitionGroupId {} with maxWaitTime {}", _clientId,
-          _partitionGroupId, maxWaitTimeMs);
+          String.format(
+              "Cannot fetch stream offset with criteria %s for clientId %s and 
partitionGroupId %d with maxWaitTime %d",
+              offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs), e);
     }
     return null;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
new file mode 100644
index 0000000000..e0adf5bfb1
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IdleTimerTest {
+
+  private static class StaticIdleTimer extends IdleTimer {
+
+    private long _nowTimeMs = 0;
+
+    public StaticIdleTimer() {
+      super();
+    }
+
+    @Override
+    protected long now() {
+      return _nowTimeMs;
+    }
+
+    public void setNowTimeMs(long nowTimeMs) {
+      _nowTimeMs = nowTimeMs;
+    }
+  }
+
+  @Test
+  public void testIdleTimerResetNoIdle() {
+    StaticIdleTimer timer = new StaticIdleTimer();
+    // idle times are all 0 before init
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+    // start times are all 1000L
+    timer.setNowTimeMs(1000L);
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+    // new now time should affect idle time
+    timer.setNowTimeMs(2000L);
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 
1000);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+    // everything resets to 2000
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+  }
+
+  @Test
+  public void testOnlyResetStreamIdleTime() {
+    StaticIdleTimer timer = new StaticIdleTimer();
+    timer.setNowTimeMs(1000L);
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+    // only stream idle time resets
+    timer.setNowTimeMs(2000L);
+    timer.markStreamCreated();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+    // everything resets to 0
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+  }
+
+  @Test
+  public void testMultipleIdleResets() {
+    StaticIdleTimer timer = new StaticIdleTimer();
+    timer.setNowTimeMs(1000L);
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+    // new now time should affect idle time
+    timer.setNowTimeMs(2000L);
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 
1000);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+    // only stream idle time resets
+    timer.markStreamCreated();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+    // everything resets to 0
+    timer.setNowTimeMs(3000L);
+    timer.markEventConsumed();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+    // later now times should affect idle time
+    timer.setNowTimeMs(4000L);
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 
1000);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+    // only stream idle time resets
+    timer.setNowTimeMs(5000L);
+    timer.markStreamCreated();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 2000);
+    // later now time should only increase both idle times
+    timer.setNowTimeMs(6000L);
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 
1000);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 3000);
+    // everything resets to 0
+    timer.init();
+    Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+    Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index d758ff627a..4704441372 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -313,10 +313,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       // are accidentally enabled together. The freshness based checker is a 
stricter version of the offset based
       // checker. But in the end, both checkers are bounded in time by 
realtimeConsumptionCatchupWaitMs.
       if (isFreshnessStatusCheckerEnabled) {
-        LOGGER.info("Setting up freshness based status checker");
+        int idleTimeoutMs = 
_serverConf.getProperty(Server.CONFIG_OF_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS,
+            Server.DEFAULT_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS);
+
+        LOGGER.info("Setting up freshness based status checker with min 
freshness {} and idle timeout {}",
+            realtimeMinFreshnessMs, idleTimeoutMs);
         FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
             new 
FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
 consumingSegments,
-                realtimeMinFreshnessMs);
+                realtimeMinFreshnessMs, idleTimeoutMs);
         Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
             
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
         serviceStatusCallbackListBuilder.add(
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 3cf3dd3587..abce5d5aaa 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -35,11 +35,13 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  */
 public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsumptionStatusChecker {
   private final long _minFreshnessMs;
+  private final long _idleTimeoutMs;
 
   public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager, Set<String> consumingSegments,
-      long minFreshnessMs) {
+      long minFreshnessMs, long idleTimeoutMs) {
     super(instanceDataManager, consumingSegments);
     _minFreshnessMs = minFreshnessMs;
+    _idleTimeoutMs = idleTimeoutMs;
   }
 
   private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset, 
StreamPartitionMsgOffset latestOffset) {
@@ -52,6 +54,10 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     return false;
   }
 
+  private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
+    return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
+  }
+
   protected long now() {
     return System.currentTimeMillis();
   }
@@ -76,15 +82,27 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     StreamPartitionMsgOffset currentOffset = 
rtSegmentDataManager.getCurrentOffset();
     StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.fetchLatestStreamOffset(5000);
     if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
-      _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}."
+      _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
               + "But the current ingested offset is equal to the latest 
available offset {}.", segmentName, freshnessMs,
           _minFreshnessMs, currentOffset);
       return true;
     }
 
+    StreamPartitionMsgOffset earliestStreamOffset = 
rtSegmentDataManager.fetchEarliestStreamOffset(5000);
+
+    long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
+    if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
+      _logger.warn("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
+              + "But the current ingested offset {} has been idle for {}ms. At 
offset {}. Earliest offset {}. "
+              + "Latest offset {}.", segmentName, freshnessMs, 
_minFreshnessMs, currentOffset, idleTimeMs,
+          currentOffset,
+          earliestStreamOffset, latestStreamOffset);
+      return true;
+    }
+
     _logger.info("Segment {} with freshness {}ms has not caught up within "
-            + "min freshness {}. At offset {}. Latest offset {}.",
-        segmentName, freshnessMs, _minFreshnessMs, currentOffset, 
latestStreamOffset);
+            + "min freshness {}. At offset {}. Earliest offset {}. Latest 
offset {}.", segmentName, freshnessMs,
+        _minFreshnessMs, currentOffset, earliestStreamOffset, 
latestStreamOffset);
     return false;
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 7f48e2f9f0..cab178bc0d 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -42,8 +42,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     private final long _now;
 
     public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-        Set<String> consumingSegments, long minFreshnessMs, long now) {
-      super(instanceDataManager, consumingSegments, minFreshnessMs);
+        Set<String> consumingSegments, long minFreshnessMs, long 
idleTimeoutMs, long now) {
+      super(instanceDataManager, consumingSegments, minFreshnessMs, 
idleTimeoutMs);
       _now = now;
     }
 
@@ -61,7 +61,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10000L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10000L, 0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -136,8 +136,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(
-            instanceDataManager, consumingSegments, 10L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -191,6 +190,130 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
+  @Test
+  public void regularCaseWithIdleTimeout() {
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    long idleTimeoutMs = 10L;
+    FreshnessBasedConsumptionStatusChecker statusChecker =
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, idleTimeoutMs,
+            100L);
+
+    // TableDataManager is not set up yet
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    // ensure negative values are ignored
+    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+    setupLatestIngestionTimestamp(segMngrA1, -1L);
+    setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+    when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+    when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+
+    //              total idle time
+    // segA0              0
+    // segA1              0
+    // segB0              0
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
- 1);
+    
when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs);
+    when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
+ 1);
+    //              total idle time
+    // segA0              9
+    // segA1              10
+    // segB0              11
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 2);
+
+    
when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs);
+    when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
+ 1);
+    //              total idle time
+    // segA0              10
+    // segA1              11
+    // segB0              11
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
+
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
+ 1);
+    //              total idle time
+    // segA0              11
+    // segA1              11
+    // segB0              11
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
+  }
+
+  @Test
+  public void 
testSegmentsNeverHealthyWhenIdleTimeoutZeroAndNoOtherCriteriaMet() {
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    FreshnessBasedConsumptionStatusChecker statusChecker =
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
+
+    // TableDataManager is not set up yet
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    // ensure negative values are ignored
+    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+    setupLatestIngestionTimestamp(segMngrA1, -1L);
+    setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+    when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+    when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(10L);
+    when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(100L);
+    when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(1000L);
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+  }
+
   @Test
   public void segmentBeingCommmitted() {
     String segA0 = "tableA__0__0__123Z";
@@ -199,7 +322,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -257,7 +380,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d77a8a9e93..775ff1fc58 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -549,6 +549,18 @@ public class CommonConstants {
     public static final String 
CONFIG_OF_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER =
         
"pinot.server.starter.enableRealtimeFreshnessBasedConsumptionStatusChecker";
     public static final boolean 
DEFAULT_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER = false;
+    // This configuration is in place to avoid servers getting stuck checking 
for freshness in
+    // cases where they will never be able to reach the freshness threshold or 
the latest offset.
+    // The only current case where we have seen this is low volume streams 
using read_committed
+    // because of transactional publishes where the last message in the stream 
is an
+    // un-consumable kafka control message, and it is impossible to tell if 
the consumer is stuck
+    // or some offsets will never be consumable.
+    //
+    // When in doubt, do not enable this configuration as it can cause a 
lagged server to start
+    // serving queries.
+    public static final String CONFIG_OF_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS =
+        "pinot.server.starter.realtimeFreshnessIdleTimeoutMs";
+    public static final int DEFAULT_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS = 0;
     public static final String CONFIG_OF_STARTUP_REALTIME_MIN_FRESHNESS_MS =
         "pinot.server.starter.realtimeMinFreshnessMs";
     // Use 10 seconds by default so high volume stream are able to catch up.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to