This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 53cfdf3  Offset based realtime consumption status checker (#7267)
53cfdf3 is described below

commit 53cfdf3c7bb97bd2d6624f2f9be9db195983c32b
Author: Sajjad Moradi <[email protected]>
AuthorDate: Mon Sep 20 11:09:45 2021 -0700

    Offset based realtime consumption status checker (#7267)
    
    * Add offset based realtime consumption status checker
    
    * Applied PR suggestions
    
    * One log line when consumption catches up
    
    * Return numConsumingSegmentsNotCaughtUp
    
    * Also add num of outstanding segments to timeout scenario
---
 .../apache/pinot/common/utils/ServiceStatus.java   |  44 +++-
 .../realtime/LLRealtimeSegmentDataManager.java     |  20 +-
 .../server/starter/helix/BaseServerStarter.java    |  15 +-
 .../helix/OffsetBasedConsumptionStatusChecker.java | 114 ++++++++++
 .../OffsetBasedConsumptionStatusCheckerTest.java   | 251 +++++++++++++++++++++
 .../apache/pinot/spi/stream/OffsetCriteria.java    |   3 +
 6 files changed, 427 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 06ee990..d50e399 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -204,28 +205,33 @@ public class ServiceStatus {
 
   /**
    * Service status callback that checks whether realtime consumption has 
caught up
-   * TODO: In this initial version, we are simply adding a configurable static 
wait time
-   * This can be made smarter:
-   * 1) Keep track of average consumption rate for table in server stats
-   * 2) Monitor consumption rate during startup, report GOOD when it 
stabilizes to average rate
-   * 3) Monitor consumption rate during startup, report GOOD if it is idle
+   * An offset based consumption status checker is being added in two phases. 
First phase adds the new status checker,
+   * but it doesn't apply its output. Instead it only logs its behavior. When 
the behavior is analysed and approved
+   * for different tables with different consumption rates, we can safely use 
the new status checker.
+   * (Another approach would be to define a new config and disable it by 
default. Since this feature is not urgent,
+   * we decided to not define yet another config and go with this two phase 
approach)
    */
   public static class RealtimeConsumptionCatchupServiceStatusCallback 
implements ServiceStatusCallback {
 
     private final long _endWaitTime;
     private final Status _serviceStatus = Status.STARTING;
+    private final Supplier<Integer> 
_getNumConsumingSegmentsNotReachedTheirLatestOffset;
     String _statusDescription = STATUS_DESCRIPTION_INIT;
 
+    private boolean _consumptionNotYetCaughtUp = true;
+
     /**
      * Realtime consumption catchup service which adds a static wait time for 
consuming segments to catchup
      */
     public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager 
helixManager, String clusterName,
-        String instanceName, long realtimeConsumptionCatchupWaitMs) {
+        String instanceName, long realtimeConsumptionCatchupWaitMs,
+        Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset) {
 
       // A consuming segment will actually be ready to serve queries after 
(time of creation of partition consumer) +
       // (configured max time to catchup)
       // We are approximating it to (time of server startup) + (configured max 
time to catch up)
       _endWaitTime = System.currentTimeMillis() + 
realtimeConsumptionCatchupWaitMs;
+      _getNumConsumingSegmentsNotReachedTheirLatestOffset = 
getNumConsumingSegmentsNotReachedTheirLatestOffset;
       LOGGER.info("Monitoring realtime consumption catchup. Will allow {} ms 
before marking status GOOD",
           realtimeConsumptionCatchupWaitMs);
     }
@@ -236,13 +242,27 @@ public class ServiceStatus {
         return _serviceStatus;
       }
       long now = System.currentTimeMillis();
-      if (now < _endWaitTime) {
-        _statusDescription =
-            String.format("Waiting for consuming segments to catchup, 
timeRemaining=%dms", _endWaitTime - now);
-        return Status.STARTING;
+      int numConsumingSegmentsNotCaughtUp = 
_getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+      if (now >= _endWaitTime) {
+        _statusDescription = String.format("Consuming segments status GOOD 
since %dms "
+            + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, 
numConsumingSegmentsNotCaughtUp);
+        return Status.GOOD;
       }
-      _statusDescription = String.format("Consuming segments status GOOD since 
%dms", _endWaitTime);
-      return Status.GOOD;
+      if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
+        // TODO: Once the performance of offset based consumption checker is 
validated:
+        //      - remove the log line
+        //      - uncomment the status & statusDescription lines
+        //      - remove variable _consumptionNotYetCaughtUp
+        _consumptionNotYetCaughtUp = false;
+        LOGGER.info("All consuming segments have reached their latest offsets! 
"
+            + "Finished {} msec earlier than time threshold.", _endWaitTime - 
now);
+//      _statusDescription = "Consuming segments status GOOD as all consuming 
segments have reached the latest offset";
+//      return Status.GOOD;
+      }
+      _statusDescription =
+          String.format("Waiting for consuming segments to catchup: 
numConsumingSegmentsNotCaughtUp=%d, "
+              + "timeRemaining=%dms", numConsumingSegmentsNotCaughtUp, 
_endWaitTime - now);
+      return Status.STARTING;
     }
 
     @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 72aaf15..a36935d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -72,6 +72,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
 import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -287,6 +288,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final boolean _nullHandlingEnabled;
   private final SegmentCommitterFactory _segmentCommitterFactory;
 
+  private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = 
null;
+
   // TODO each time this method is called, we print reason for stop. Good to 
print only once.
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", 
_state);
@@ -763,11 +766,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     return _lastLogTime;
   }
 
-  @VisibleForTesting
-  protected StreamPartitionMsgOffset getCurrentOffset() {
+  public StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
   }
 
+  public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
+    return _latestStreamOffsetAtStartupTime;
+  }
+
   @VisibleForTesting
   protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
     return _segmentBuildDescriptor;
@@ -1364,6 +1370,16 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     }
     _state = State.INITIAL_CONSUMING;
 
+    // fetch latest stream offset
+    try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
+        .createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
+      _latestStreamOffsetAtStartupTime = metadataProvider
+          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 
/*maxWaitTimeMs*/5000);
+    } catch (Exception e) {
+      _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} 
and partitionGroupId {}", _clientId,
+          _partitionGroupId);
+    }
+
     long now = now();
     _consumeStartTime = now;
     long maxConsumeTimeMillis = 
_partitionLevelStreamConfig.getFlushThresholdTimeMillis();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index e58d020..9f964d5 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -215,8 +215,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
-    // if even 1 resource has this instance in ideal state with state 
CONSUMING, set this to true
-    boolean foundConsuming = false;
+
+    Set<String> consumingSegments = new HashSet<>();
     boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
 
     for (String resourceName : 
_helixAdmin.getResourcesInCluster(_helixClusterName)) {
@@ -235,12 +235,11 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
             break;
           }
         }
-        if (checkRealtime && !foundConsuming && 
TableNameBuilder.isRealtimeTableResource(resourceName)) {
+        if (checkRealtime && 
TableNameBuilder.isRealtimeTableResource(resourceName)) {
           for (String partitionName : idealState.getPartitionSet()) {
             if (StateModel.SegmentStateModel.CONSUMING
                 
.equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
-              foundConsuming = true;
-              break;
+              consumingSegments.add(partitionName);
             }
           }
         }
@@ -255,10 +254,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     serviceStatusCallbackListBuilder.add(
         new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
 _helixClusterName,
             _instanceId, resourcesToMonitor, minResourcePercentForStartup));
+    boolean foundConsuming = !consumingSegments.isEmpty();
     if (checkRealtime && foundConsuming) {
+      OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+          new 
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), 
consumingSegments);
       serviceStatusCallbackListBuilder.add(
           new 
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, 
_helixClusterName,
-              _instanceId, realtimeConsumptionCatchupWaitMs));
+              _instanceId, realtimeConsumptionCatchupWaitMs,
+              
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
     }
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
new file mode 100644
index 0000000..e67eacd
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is used at startup time to have a more accurate estimate of the 
catchup period in which no query execution
+ * happens and consumers try to catch up to the latest messages available in 
streams.
+ * To achieve this, every time status check is called - {@link 
#getNumConsumingSegmentsNotReachedTheirLatestOffset} -
+ * for each consuming segment, we check if segment's latest ingested offset 
has reached the latest stream offset that's
+ * fetched once at startup time.
+ */
+public class OffsetBasedConsumptionStatusChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetBasedConsumptionStatusChecker.class);
+
+  // constructor parameters
+  private final InstanceDataManager _instanceDataManager;
+  private final Set<String> _consumingSegments;
+
+  // helper variable
+  private final Set<String> _caughtUpSegments = new HashSet<>();
+
+  public OffsetBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager, Set<String> consumingSegments) {
+    _instanceDataManager = instanceDataManager;
+    _consumingSegments = consumingSegments;
+  }
+
+  public int getNumConsumingSegmentsNotReachedTheirLatestOffset() {
+    for (String segName : _consumingSegments) {
+      if (_caughtUpSegments.contains(segName)) {
+        continue;
+      }
+      TableDataManager tableDataManager = getTableDataManager(segName);
+      if (tableDataManager == null) {
+        LOGGER.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        continue;
+      }
+      SegmentDataManager segmentDataManager = null;
+      try {
+        segmentDataManager = tableDataManager.acquireSegment(segName);
+        if (segmentDataManager == null) {
+          LOGGER
+              .info("SegmentDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+          continue;
+        }
+        if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
+          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
+          // segment data manager will not be of type LLRealtime.
+          LOGGER.info("Segment {} is already committed and is considered 
caught up.", segName);
+          _caughtUpSegments.add(segName);
+          continue;
+        }
+        LLRealtimeSegmentDataManager rtSegmentDataManager = 
(LLRealtimeSegmentDataManager) segmentDataManager;
+        StreamPartitionMsgOffset latestIngestedOffset = 
rtSegmentDataManager.getCurrentOffset();
+        StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
+        if (latestStreamOffset == null || latestIngestedOffset == null) {
+          LOGGER.info("Null offset found for segment {} - latest stream 
offset: {}, latest ingested offset: {}. "
+              + "Will check consumption status later", segName, 
latestStreamOffset, latestIngestedOffset);
+          continue;
+        }
+        if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
+          LOGGER.info("Latest ingested offset {} in segment {} is smaller than 
stream latest available offset {} ",
+              latestIngestedOffset, segName, latestStreamOffset);
+          continue;
+        }
+        LOGGER.info("Segment {} with latest ingested offset {} has caught up 
to the latest stream offset {}", segName,
+            latestIngestedOffset, latestStreamOffset);
+        _caughtUpSegments.add(segName);
+      } finally {
+        if (segmentDataManager != null) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    }
+    return _consumingSegments.size() - _caughtUpSegments.size();
+  }
+
+  private TableDataManager getTableDataManager(String segmentName) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    String tableName = llcSegmentName.getTableName();
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    return _instanceDataManager.getTableDataManager(tableNameWithType);
+  }
+}
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
new file mode 100644
index 0000000..be03eef
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class OffsetBasedConsumptionStatusCheckerTest {
+
+  @Test
+  public void regularCase() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(1500));
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 3);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                       15
+    // segA1              200                      150
+    // segB0              2000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 0);
+  }
+
+  @Test
+  public void dataMangersBeingSetup() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+
+    // TableDataManager is not set up yet
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 3);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup some SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+
+    //           latest ingested offset    latest stream offset
+    // segA0               10                     15
+    // segA1               100                    150
+    // segB0           not setup yet              1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(150));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 3);
+
+    // setup the remaining SegmentDataManager
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset     latest stream offset
+    // segA0               20                      15
+    // segA1               200                     150
+    // segB0               1000                    1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(1500));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 1);
+
+    //           latest ingested offset     latest stream offset
+    // segA0               30                      15
+    // segA1               300                     150
+    // segB0               2000                    1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 0);
+  }
+
+  @Test
+  public void segmentsBeingCommitted() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(1500));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 3);
+
+    // segB0 is now committed; ImmutableSegmentDataManager is returned by 
table data manager
+    ImmutableSegmentDataManager immSegMngrB0 = 
mock(ImmutableSegmentDataManager.class);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                        15
+    // segA1              200                       150
+    // segB0        committed at 1200               1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 0);
+  }
+
+  @Test
+  public void cannotGetLatestStreamOffset() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = 
mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = 
mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     null - could not get the 
latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(null);
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 3);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                        15
+    // segA1              200                       150
+    // segB0              2000                      null - could not get the 
latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 1);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              30                        15
+    // segA1              300                       150
+    // segB0              3000                      null - could not get the 
latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(3000));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(),
 1);
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
index e0d7166..14cb6a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
@@ -31,6 +31,9 @@ public class OffsetCriteria {
   public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA =
       new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest();
 
+  public static final OffsetCriteria LARGEST_OFFSET_CRITERIA =
+      new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest();
+
   /**
    * Enumerates the supported offset types
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to