mcvsubbu commented on code in PR #9994:
URL: https://github.com/apache/pinot/pull/9994#discussion_r1063872683


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given 
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects 
hosted in the corresponding
+ *   RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In 
addition, individual metrics are associated
+ *   with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are 
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay 
can be increased when partition stops
+ *   consuming for any reason other than no events being available for 
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
+ *   delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the 
corresponding partition.
+ *   If no consumption is noticed after the timeout, we then read ideal state 
to confirm the server still hosts the
+ *   partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions 
and drive the reading of their ideal
+ *  state.
+ *
+ *  The following diagram illustrates the object interactions with main 
external APIs
+ *
+ *     (CONSUMING -> ONLINE state change)
+ *             |
+ *      markPartitionForConfirmation(partitionId)
+ *            |                         
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ *            |                         |
+ * ___________V_________________________V_
+ * |           (Table X)                
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker              |           ...
+ * 
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
 (Partition n}}
+ *              ^                      ^
+ *              |                       \
+ *   timeoutInactivePartitions()    
stopTrackingPartitionIngestionDelay(partitionId)
+ *    _________|__________                \
+ *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
+ *   |___________________|
+ *
+ */
+
+public class IngestionDelayTracker {
+
+  // Sleep interval for timer thread that triggers read of ideal state
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
+  // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
+  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  // Delay Timer thread for this amount of time after starting timer
+  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+
+  /*
+   * Class to keep an ingestion delay measure and the time when the sample was 
taken (i.e. sample time)
+   * We will use the sample time to increase ingestion delay when a partition 
stops consuming: the time
+   * difference between the sample time and current time will be added to the 
metric when read.
+   */
+  static private class DelayMeasure {
+    public DelayMeasure(long t, long d) {
+      _delayMilliseconds = d;
+      _sampleTime = t;
+    }
+    public final long _delayMilliseconds;
+    public final long _sampleTime;
+  }
+
+  // HashMap used to store delay measures for all partitions active for the 
current table.
+  private final ConcurrentHashMap<Integer, DelayMeasure> 
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
+  // go back to CONSUMING in some period of time, we confirm whether they are 
still hosted in this server by reading
+  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  private final ConcurrentHashMap<Integer, Long> 
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+  final int _timerThreadTickIntervalMs;
+  // Timer task to check partitions that are inactive against ideal state.
+  private final Timer _timer;
+
+  private final ServerMetrics _serverMetrics;
+  private final String _tableNameWithType;
+  private final String _metricName;
+
+  private final boolean _enablePerPartitionMetric;
+  private final boolean _enableAggregateMetric;
+  private final Logger _logger;
+
+  private final RealtimeTableDataManager _realTimeTableDataManager;
+  private Clock _clock;
+
+  /*
+   * Returns the maximum ingestion delay amongs all partitions we are tracking.
+   */
+  private DelayMeasure getMaximumDelay() {
+    DelayMeasure newMax = null;
+    for (int partitionGroupId : _partitionToDelaySampleMap.keySet()) {
+      DelayMeasure currentMeasure = 
_partitionToDelaySampleMap.get(partitionGroupId);
+      if ((newMax == null)
+          ||
+          (currentMeasure != null) && (currentMeasure._delayMilliseconds > 
newMax._delayMilliseconds)) {
+        newMax = currentMeasure;
+      }
+    }
+    return newMax;
+  }
+
+  /*
+   * Helper function to age a delay measure. Aging means adding the time 
elapsed since the measure was
+   * taken till the measure is being reported.
+  *
+  * @param currentDelay original sample delay to which we will add the age of 
the measure.
+   */
+  private long getAgedDelay(DelayMeasure currentDelay) {
+    if (currentDelay == null) {
+      return 0; // return 0 when not initialized
+    }
+    // Add age of measure to the reported value
+    long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+    // Correct to zero for any time shifts due to NTP or time reset.
+    measureAgeInMs = Math.max(measureAgeInMs, 0);
+    return currentDelay._delayMilliseconds + measureAgeInMs;
+  }
+
+  /*
+   * Helper function to be called when we should stop tracking a given 
partition. Removes the partition from
+   * all our maps.
+   *
+   * @param partitionGroupId partition ID which we should stop tracking.
+   */
+  private void removePartitionId(int partitionGroupId) {
+    _partitionToDelaySampleMap.remove(partitionGroupId);
+    // If we are removing a partition we should stop reading its ideal state.
+    _partitionsMarkedForVerification.remove(partitionGroupId);
+    if (_enablePerPartitionMetric) {
+      
_serverMetrics.removeTableGauge(getPerPartitionMetricName(partitionGroupId),
+          ServerGauge.TABLE_INGESTION_DELAY_MS);
+    }
+  }
+
+  /*
+   * Helper function to generate a per partition metric name.
+   *
+   * @param partitionGroupId the partition group id to be appended to the 
table name so we
+   *        can differentiate between metrics for various partitions.
+   *
+   * @return a metric name with the following structure: _metricName + 
partitionGroupId
+   */
+  private String getPerPartitionMetricName(int partitionGroupId) {
+    return _metricName + partitionGroupId;
+  }
+
+  /*
+   * Helper functions that creates a list of all the partitions that are 
marked for verification and whose
+   * timeouts are expired. This helps us optimize checks of the ideal state.
+   */
+  private ArrayList<Integer> getPartitionsToBeVerified() {
+    ArrayList<Integer> partitionsToVerify = new ArrayList<>();
+    for (int partitionGroupId : _partitionsMarkedForVerification.keySet()) {
+      long markTime = _partitionsMarkedForVerification.get(partitionGroupId);
+      long timeMarked = _clock.millis() - markTime;
+      if (timeMarked > PARTITION_TIMEOUT_MS) {
+        // Partition must be verified
+        partitionsToVerify.add(partitionGroupId);
+      }
+    }
+    return partitionsToVerify;
+  }
+
+  // Custom Constructor
+  public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+      RealtimeTableDataManager realtimeTableDataManager, int 
timerThreadTickIntervalMs, String metricNamePrefix,
+      boolean enableAggregateMetric, boolean enablePerPartitionMetric)
+      throws RuntimeException {
+    _logger = LoggerFactory.getLogger(getClass().getSimpleName());
+    _serverMetrics = serverMetrics;
+    _tableNameWithType = tableNameWithType;
+    _metricName = metricNamePrefix + tableNameWithType;
+    _realTimeTableDataManager = realtimeTableDataManager;
+    _clock = Clock.systemDefaultZone();

Review Comment:
   We should use `Clock.systemUTC()`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given 
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects 
hosted in the corresponding
+ *   RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In 
addition, individual metrics are associated
+ *   with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are 
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay 
can be increased when partition stops
+ *   consuming for any reason other than no events being available for 
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
+ *   delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the 
corresponding partition.
+ *   If no consumption is noticed after the timeout, we then read ideal state 
to confirm the server still hosts the
+ *   partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions 
and drive the reading of their ideal
+ *  state.
+ *
+ *  The following diagram illustrates the object interactions with main 
external APIs
+ *
+ *     (CONSUMING -> ONLINE state change)
+ *             |
+ *      markPartitionForConfirmation(partitionId)
+ *            |                         
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ *            |                         |
+ * ___________V_________________________V_
+ * |           (Table X)                
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker              |           ...
+ * 
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
 (Partition n}}
+ *              ^                      ^
+ *              |                       \
+ *   timeoutInactivePartitions()    
stopTrackingPartitionIngestionDelay(partitionId)
+ *    _________|__________                \
+ *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
+ *   |___________________|
+ *
+ */
+
+public class IngestionDelayTracker {
+
+  // Sleep interval for timer thread that triggers read of ideal state
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
+  // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
+  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  // Delay Timer thread for this amount of time after starting timer
+  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+
+  /*
+   * Class to keep an ingestion delay measure and the time when the sample was 
taken (i.e. sample time)
+   * We will use the sample time to increase ingestion delay when a partition 
stops consuming: the time
+   * difference between the sample time and current time will be added to the 
metric when read.
+   */
+  static private class DelayMeasure {
+    public DelayMeasure(long t, long d) {
+      _delayMilliseconds = d;
+      _sampleTime = t;
+    }
+    public final long _delayMilliseconds;
+    public final long _sampleTime;
+  }
+
+  // HashMap used to store delay measures for all partitions active for the 
current table.
+  private final ConcurrentHashMap<Integer, DelayMeasure> 
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
+  // go back to CONSUMING in some period of time, we confirm whether they are 
still hosted in this server by reading
+  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  private final ConcurrentHashMap<Integer, Long> 
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+  final int _timerThreadTickIntervalMs;
+  // Timer task to check partitions that are inactive against ideal state.
+  private final Timer _timer;
+
+  private final ServerMetrics _serverMetrics;
+  private final String _tableNameWithType;
+  private final String _metricName;
+
+  private final boolean _enablePerPartitionMetric;
+  private final boolean _enableAggregateMetric;
+  private final Logger _logger;
+
+  private final RealtimeTableDataManager _realTimeTableDataManager;
+  private Clock _clock;
+
+  /*
+   * Returns the maximum ingestion delay amongs all partitions we are tracking.
+   */
+  private DelayMeasure getMaximumDelay() {
+    DelayMeasure newMax = null;
+    for (int partitionGroupId : _partitionToDelaySampleMap.keySet()) {
+      DelayMeasure currentMeasure = 
_partitionToDelaySampleMap.get(partitionGroupId);
+      if ((newMax == null)
+          ||
+          (currentMeasure != null) && (currentMeasure._delayMilliseconds > 
newMax._delayMilliseconds)) {
+        newMax = currentMeasure;
+      }
+    }
+    return newMax;
+  }
+
+  /*
+   * Helper function to age a delay measure. Aging means adding the time 
elapsed since the measure was
+   * taken till the measure is being reported.
+  *
+  * @param currentDelay original sample delay to which we will add the age of 
the measure.
+   */
+  private long getAgedDelay(DelayMeasure currentDelay) {

Review Comment:
   Suggestion:
   You can remove this method, in favor of a method inside the DelayMeasure 
class that computes the agedDelay. You can do the null check outside this 
method in the one case where it may happen that it is null.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given 
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects 
hosted in the corresponding
+ *   RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In 
addition, individual metrics are associated
+ *   with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are 
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay 
can be increased when partition stops
+ *   consuming for any reason other than no events being available for 
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
+ *   delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the 
corresponding partition.
+ *   If no consumption is noticed after the timeout, we then read ideal state 
to confirm the server still hosts the
+ *   partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions 
and drive the reading of their ideal
+ *  state.
+ *
+ *  The following diagram illustrates the object interactions with main 
external APIs
+ *
+ *     (CONSUMING -> ONLINE state change)
+ *             |
+ *      markPartitionForConfirmation(partitionId)
+ *            |                         
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ *            |                         |
+ * ___________V_________________________V_
+ * |           (Table X)                
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker              |           ...
+ * 
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
 (Partition n}}
+ *              ^                      ^
+ *              |                       \
+ *   timeoutInactivePartitions()    
stopTrackingPartitionIngestionDelay(partitionId)
+ *    _________|__________                \
+ *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
+ *   |___________________|
+ *
+ */
+
+public class IngestionDelayTracker {
+
+  // Sleep interval for timer thread that triggers read of ideal state
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
+  // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
+  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  // Delay Timer thread for this amount of time after starting timer
+  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+
+  /*
+   * Class to keep an ingestion delay measure and the time when the sample was 
taken (i.e. sample time)
+   * We will use the sample time to increase ingestion delay when a partition 
stops consuming: the time
+   * difference between the sample time and current time will be added to the 
metric when read.
+   */
+  static private class DelayMeasure {
+    public DelayMeasure(long t, long d) {
+      _delayMilliseconds = d;
+      _sampleTime = t;
+    }
+    public final long _delayMilliseconds;
+    public final long _sampleTime;
+  }
+
+  // HashMap used to store delay measures for all partitions active for the 
current table.
+  private final ConcurrentHashMap<Integer, DelayMeasure> 
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
+  // go back to CONSUMING in some period of time, we confirm whether they are 
still hosted in this server by reading
+  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  private final ConcurrentHashMap<Integer, Long> 
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+  final int _timerThreadTickIntervalMs;
+  // Timer task to check partitions that are inactive against ideal state.
+  private final Timer _timer;
+
+  private final ServerMetrics _serverMetrics;
+  private final String _tableNameWithType;
+  private final String _metricName;
+
+  private final boolean _enablePerPartitionMetric;
+  private final boolean _enableAggregateMetric;
+  private final Logger _logger;

Review Comment:
   Can be a static final. Not a member variable.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -115,15 +117,24 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private TableUpsertMetadataManager _tableUpsertMetadataManager;
+  // Object to track ingestion delay for all partitions
+  private IngestionDelayTracker _ingestionDelayTracker;
+  private Supplier<Boolean> _isReadyToServeQueries;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+    this(segmentBuildSemaphore, () -> true);
+  }
+
+  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
Supplier<Boolean> isReadyToServeQueries) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
+    _isReadyToServeQueries = isReadyToServeQueries;

Review Comment:
   I suggest that the `isReadyToServeQueries` either be handled in the table 
data manager (by blocking all calls to ingestion delay tracker), or in the 
ingestion delay tracker itself.  Let us not involve the segment data manager 
with another state that it really does not need to know about



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java:
##########
@@ -39,25 +39,25 @@ private TableStateUtils() {
   }
 
   /**
-   * Checks if all segments for the given @param tableNameWithType are 
succesfully loaded
-   * This function will get all segments in IDEALSTATE and CURRENTSTATE for 
the given table,
-   * and then check if all ONLINE segments in IDEALSTATE match with 
CURRENTSTATE.
-   * @param helixManager helix manager for the server instance
-   * @param tableNameWithType table name for which segment state is to be 
checked
-   * @return true if all segments for the given table are succesfully loaded. 
False otherwise
+   * Returns all online segments for a given table.
+   *
+   * @param helixManager instance of Helix manager
+   * @param tableNameWithType table for which we are obtaining ONLINE segments
+   *
+   * @return List of ONLINE segment names.
    */
-  public static boolean isAllSegmentsLoaded(HelixManager helixManager, String 
tableNameWithType) {
+  public static List<String> getOnlineSegmentsForThisInstance(HelixManager 
helixManager, String tableNameWithType) {

Review Comment:
   Why not return this as a set?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -611,6 +613,8 @@ private boolean processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePi
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
       }
+      // Record Pinot ingestion delay as zero since we are up-to-date and no 
new events
+      _realtimeTableDataManager.updateIngestionDelay(0, 
System.currentTimeMillis(), _partitionGroupId);

Review Comment:
   Looks like this call is made even if the server is not ready to serve queries



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -212,6 +223,58 @@ protected void doShutdown() {
     if (_leaseExtender != null) {
       _leaseExtender.shutDown();
     }
+    // Make sure we do metric cleanup when we shut down the table.
+    _ingestionDelayTracker.shutdown();
+  }
+
+  /*
+   * Method used by LLRealtimeSegmentManagers to update their partition delays
+   *
+   * @param ingestionDelayMs Ingestion delay being reported.
+   * @param currentTimeMs Timestamp of the measure being provided, i.e. when 
this delay was computed.
+   * @param partitionGroupId Partition ID for which delay is being updated.
+   */
+  public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs, 
int partitionGroupId) {
+    _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs, 
currenTimeMs, partitionGroupId);
+  }
+
+  /*
+   * Method to handle CONSUMING -> DROPPED segment state transitions:
+   * We stop tracking partitions whose segments are dropped.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToDropped(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId());
+  }
+
+  /*
+   * Method to handle CONSUMING -> ONLINE segment state transitions:
+   * We mark partitions for verification against ideal state when we do not 
see a consuming segment for some time
+   * for that partition. The idea is to remove the related metrics when the 
partition moves from the current server.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToOnline(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    
_ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId());
+  }
+
+  /**
+   * Returns all partitionGroupIds for the partitions hosted by this server 
for current table.
+   * @Note: this involves Zookeeper read and should not be used frequently due 
to efficiency concerns.
+   */
+  public List<Integer> getHostedPartitionsGroupIds() {
+    ArrayList<Integer> partitionsHostedByThisServer = new ArrayList<>();
+    List<String> segments = 
TableStateUtils.getOnlineSegmentsForThisInstance(_helixManager, 
_tableNameWithType);

Review Comment:
   Major: You want to get the consuming segments for this instance, not online 
segments from the idealtstate.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given 
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects 
hosted in the corresponding
+ *   RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In 
addition, individual metrics are associated
+ *   with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are 
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay 
can be increased when partition stops
+ *   consuming for any reason other than no events being available for 
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
+ *   delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the 
corresponding partition.
+ *   If no consumption is noticed after the timeout, we then read ideal state 
to confirm the server still hosts the
+ *   partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions 
and drive the reading of their ideal
+ *  state.
+ *
+ *  The following diagram illustrates the object interactions with main 
external APIs
+ *
+ *     (CONSUMING -> ONLINE state change)
+ *             |
+ *      markPartitionForConfirmation(partitionId)
+ *            |                         
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ *            |                         |
+ * ___________V_________________________V_
+ * |           (Table X)                
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker              |           ...
+ * 
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
 (Partition n}}
+ *              ^                      ^
+ *              |                       \
+ *   timeoutInactivePartitions()    
stopTrackingPartitionIngestionDelay(partitionId)
+ *    _________|__________                \
+ *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
+ *   |___________________|
+ *
+ */
+
+public class IngestionDelayTracker {
+
+  // Sleep interval for timer thread that triggers read of ideal state
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
+  // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
+  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  // Delay Timer thread for this amount of time after starting timer
+  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+
+  /*
+   * Class to keep an ingestion delay measure and the time when the sample was 
taken (i.e. sample time)
+   * We will use the sample time to increase ingestion delay when a partition 
stops consuming: the time
+   * difference between the sample time and current time will be added to the 
metric when read.
+   */
+  static private class DelayMeasure {
+    public DelayMeasure(long t, long d) {
+      _delayMilliseconds = d;
+      _sampleTime = t;
+    }
+    public final long _delayMilliseconds;
+    public final long _sampleTime;
+  }
+
+  // HashMap used to store delay measures for all partitions active for the 
current table.
+  private final ConcurrentHashMap<Integer, DelayMeasure> 
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
+  // go back to CONSUMING in some period of time, we confirm whether they are 
still hosted in this server by reading
+  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  private final ConcurrentHashMap<Integer, Long> 
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+  final int _timerThreadTickIntervalMs;
+  // Timer task to check partitions that are inactive against ideal state.
+  private final Timer _timer;
+
+  private final ServerMetrics _serverMetrics;
+  private final String _tableNameWithType;
+  private final String _metricName;
+
+  private final boolean _enablePerPartitionMetric;
+  private final boolean _enableAggregateMetric;
+  private final Logger _logger;
+
+  private final RealtimeTableDataManager _realTimeTableDataManager;
+  private Clock _clock;
+
+  /*
+   * Returns the maximum ingestion delay amongs all partitions we are tracking.
+   */
+  private DelayMeasure getMaximumDelay() {
+    DelayMeasure newMax = null;
+    for (int partitionGroupId : _partitionToDelaySampleMap.keySet()) {
+      DelayMeasure currentMeasure = 
_partitionToDelaySampleMap.get(partitionGroupId);
+      if ((newMax == null)
+          ||
+          (currentMeasure != null) && (currentMeasure._delayMilliseconds > 
newMax._delayMilliseconds)) {

Review Comment:
   I thought there was a comment to conform to `Ms` everywhere instead of 
"millis" or "millseconds", etc.?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to