Jackie-Jiang commented on code in PR #12883:
URL: https://github.com/apache/pinot/pull/12883#discussion_r1573017665


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +359,21 @@ private void registerServiceStatusHandler() {
         new 
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
+  private Set<String> getConsumingSegments(String realtimeTableName) {

Review Comment:
   (minor) Annotate return as `@Nullable`



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public abstract class IngestionBasedConsumptionStatusChecker {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
-  // constructor parameters
-  protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
-
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  private final InstanceDataManager _instanceDataManager;
+  private final Map<String, Set<String>> _consumingSegmentsByTable;
+  private final Map<String, Set<String>> _caughtUpSegmentsByTable = new 
HashMap<>();
+  private final Function<String, Set<String>> _consumingSegmentsSupplier;
 
+  /**
+   * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided 
as it can be costly to get
+   * consumingSegmentsByTable via the supplier, so only use it when any 
missing segment is detected.
+   */
   public IngestionBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-      Set<String> consumingSegments) {
+      Map<String, Set<String>> consumingSegmentsByTable, Function<String, 
Set<String>> consumingSegmentsSupplier) {
     _instanceDataManager = instanceDataManager;
-    _consumingSegments = consumingSegments;
+    _consumingSegmentsByTable = consumingSegmentsByTable;
+    _consumingSegmentsSupplier = consumingSegmentsSupplier;
   }
 
-  public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
-    for (String segName : _consumingSegments) {
-      if (_caughtUpSegments.contains(segName)) {
-        continue;
-      }
-      TableDataManager tableDataManager = getTableDataManager(segName);
+  // This might be called by multiple threads, thus synchronized to be correct.
+  public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() 
{
+    Set<String> tablesWithMissingSegment = new HashSet<>();
+    for (Map.Entry<String, Set<String>> tableSegments : 
_consumingSegmentsByTable.entrySet()) {
+      String tableNameWithType = tableSegments.getKey();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
       if (tableDataManager == null) {
-        _logger.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        _logger.info("No tableDataManager for table: {}. Will check 
consumption status later", tableNameWithType);
+        tablesWithMissingSegment.add(tableNameWithType);
         continue;
       }
-      SegmentDataManager segmentDataManager = null;
-      try {
-        segmentDataManager = tableDataManager.acquireSegment(segName);
-        if (segmentDataManager == null) {
-          _logger.info("SegmentDataManager is not yet setup for segment {}. 
Will check consumption status later",
-              segName);
+      Set<String> consumingSegments = tableSegments.getValue();
+      Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      for (String segName : consumingSegments) {
+        if (caughtUpSegments.contains(segName)) {
           continue;
         }
-        if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
-          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
-          // segment data manager will not be of type 
RealtimeSegmentDataManager.
-          _logger.info("Segment {} is already committed and is considered 
caught up.", segName);
-          _caughtUpSegments.add(segName);
-          continue;
-        }
-
-        RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-        if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
-          _caughtUpSegments.add(segName);
+        SegmentDataManager segmentDataManager = null;
+        try {
+          segmentDataManager = tableDataManager.acquireSegment(segName);
+          if (segmentDataManager == null) {
+            _logger.info("No SegmentDataManager for segment: {}. Will check 
consumption status later", segName);
+            tablesWithMissingSegment.add(tableNameWithType);
+            continue;
+          }
+          if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+            // There's a possibility that a consuming segment has converted to 
a committed segment. If that's the case,
+            // segment data manager will not be of type 
RealtimeSegmentDataManager.
+            _logger.info("Segment: {} is already committed and is considered 
caught up.", segName);
+            caughtUpSegments.add(segName);
+            continue;
+          }
+          RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
+          if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+            caughtUpSegments.add(segName);
+          }
+        } finally {
+          if (segmentDataManager != null) {
+            tableDataManager.releaseSegment(segmentDataManager);
+          }

Review Comment:
   (minor) Move the assignment outside and remove the null check in `finally`
   ``` suggestion
           SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segName);
           if (segmentDataManager == null) {
             _logger.info("No SegmentDataManager for segment: {}. Will check 
consumption status later", segName);
             tablesWithMissingSegment.add(tableNameWithType);
             continue;
           }
           try {
             if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
               // There's a possibility that a consuming segment has converted 
to a committed segment. If that's the case,
               // segment data manager will not be of type 
RealtimeSegmentDataManager.
               _logger.info("Segment: {} is already committed and is considered 
caught up.", segName);
               caughtUpSegments.add(segName);
               continue;
             }
             RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
             if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
               caughtUpSegments.add(segName);
             }
           } finally {
             tableDataManager.releaseSegment(segmentDataManager);
   ```



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public abstract class IngestionBasedConsumptionStatusChecker {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
-  // constructor parameters
-  protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
-
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  private final InstanceDataManager _instanceDataManager;
+  private final Map<String, Set<String>> _consumingSegmentsByTable;
+  private final Map<String, Set<String>> _caughtUpSegmentsByTable = new 
HashMap<>();
+  private final Function<String, Set<String>> _consumingSegmentsSupplier;
 
+  /**
+   * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided 
as it can be costly to get
+   * consumingSegmentsByTable via the supplier, so only use it when any 
missing segment is detected.
+   */
   public IngestionBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-      Set<String> consumingSegments) {
+      Map<String, Set<String>> consumingSegmentsByTable, Function<String, 
Set<String>> consumingSegmentsSupplier) {
     _instanceDataManager = instanceDataManager;
-    _consumingSegments = consumingSegments;
+    _consumingSegmentsByTable = consumingSegmentsByTable;
+    _consumingSegmentsSupplier = consumingSegmentsSupplier;
   }
 
-  public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
-    for (String segName : _consumingSegments) {
-      if (_caughtUpSegments.contains(segName)) {
-        continue;
-      }
-      TableDataManager tableDataManager = getTableDataManager(segName);
+  // This might be called by multiple threads, thus synchronized to be correct.
+  public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() 
{
+    Set<String> tablesWithMissingSegment = new HashSet<>();
+    for (Map.Entry<String, Set<String>> tableSegments : 
_consumingSegmentsByTable.entrySet()) {
+      String tableNameWithType = tableSegments.getKey();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
       if (tableDataManager == null) {
-        _logger.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        _logger.info("No tableDataManager for table: {}. Will check 
consumption status later", tableNameWithType);
+        tablesWithMissingSegment.add(tableNameWithType);
         continue;
       }
-      SegmentDataManager segmentDataManager = null;
-      try {
-        segmentDataManager = tableDataManager.acquireSegment(segName);
-        if (segmentDataManager == null) {
-          _logger.info("SegmentDataManager is not yet setup for segment {}. 
Will check consumption status later",
-              segName);
+      Set<String> consumingSegments = tableSegments.getValue();
+      Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      for (String segName : consumingSegments) {
+        if (caughtUpSegments.contains(segName)) {
           continue;
         }
-        if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
-          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
-          // segment data manager will not be of type 
RealtimeSegmentDataManager.
-          _logger.info("Segment {} is already committed and is considered 
caught up.", segName);
-          _caughtUpSegments.add(segName);
-          continue;
-        }
-
-        RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-        if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
-          _caughtUpSegments.add(segName);
+        SegmentDataManager segmentDataManager = null;
+        try {
+          segmentDataManager = tableDataManager.acquireSegment(segName);
+          if (segmentDataManager == null) {
+            _logger.info("No SegmentDataManager for segment: {}. Will check 
consumption status later", segName);
+            tablesWithMissingSegment.add(tableNameWithType);
+            continue;
+          }
+          if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+            // There's a possibility that a consuming segment has converted to 
a committed segment. If that's the case,
+            // segment data manager will not be of type 
RealtimeSegmentDataManager.
+            _logger.info("Segment: {} is already committed and is considered 
caught up.", segName);
+            caughtUpSegments.add(segName);
+            continue;
+          }
+          RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
+          if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+            caughtUpSegments.add(segName);
+          }
+        } finally {
+          if (segmentDataManager != null) {
+            tableDataManager.releaseSegment(segmentDataManager);
+          }
         }
-      } finally {
-        if (segmentDataManager != null) {
-          tableDataManager.releaseSegment(segmentDataManager);
+      }

Review Comment:
   After the for loop, we can remove the table if all segments are caught up 
for a table 



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public abstract class IngestionBasedConsumptionStatusChecker {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
-  // constructor parameters
-  protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
-
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  private final InstanceDataManager _instanceDataManager;
+  private final Map<String, Set<String>> _consumingSegmentsByTable;
+  private final Map<String, Set<String>> _caughtUpSegmentsByTable = new 
HashMap<>();
+  private final Function<String, Set<String>> _consumingSegmentsSupplier;
 
+  /**
+   * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided 
as it can be costly to get
+   * consumingSegmentsByTable via the supplier, so only use it when any 
missing segment is detected.
+   */
   public IngestionBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-      Set<String> consumingSegments) {
+      Map<String, Set<String>> consumingSegmentsByTable, Function<String, 
Set<String>> consumingSegmentsSupplier) {
     _instanceDataManager = instanceDataManager;
-    _consumingSegments = consumingSegments;
+    _consumingSegmentsByTable = consumingSegmentsByTable;
+    _consumingSegmentsSupplier = consumingSegmentsSupplier;
   }
 
-  public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
-    for (String segName : _consumingSegments) {
-      if (_caughtUpSegments.contains(segName)) {
-        continue;
-      }
-      TableDataManager tableDataManager = getTableDataManager(segName);
+  // This might be called by multiple threads, thus synchronized to be correct.
+  public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() 
{
+    Set<String> tablesWithMissingSegment = new HashSet<>();
+    for (Map.Entry<String, Set<String>> tableSegments : 
_consumingSegmentsByTable.entrySet()) {
+      String tableNameWithType = tableSegments.getKey();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
       if (tableDataManager == null) {
-        _logger.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        _logger.info("No tableDataManager for table: {}. Will check 
consumption status later", tableNameWithType);
+        tablesWithMissingSegment.add(tableNameWithType);
         continue;
       }
-      SegmentDataManager segmentDataManager = null;
-      try {
-        segmentDataManager = tableDataManager.acquireSegment(segName);
-        if (segmentDataManager == null) {
-          _logger.info("SegmentDataManager is not yet setup for segment {}. 
Will check consumption status later",
-              segName);
+      Set<String> consumingSegments = tableSegments.getValue();
+      Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      for (String segName : consumingSegments) {
+        if (caughtUpSegments.contains(segName)) {
           continue;
         }
-        if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
-          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
-          // segment data manager will not be of type 
RealtimeSegmentDataManager.
-          _logger.info("Segment {} is already committed and is considered 
caught up.", segName);
-          _caughtUpSegments.add(segName);
-          continue;
-        }
-
-        RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-        if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
-          _caughtUpSegments.add(segName);
+        SegmentDataManager segmentDataManager = null;
+        try {
+          segmentDataManager = tableDataManager.acquireSegment(segName);
+          if (segmentDataManager == null) {
+            _logger.info("No SegmentDataManager for segment: {}. Will check 
consumption status later", segName);
+            tablesWithMissingSegment.add(tableNameWithType);
+            continue;
+          }
+          if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+            // There's a possibility that a consuming segment has converted to 
a committed segment. If that's the case,
+            // segment data manager will not be of type 
RealtimeSegmentDataManager.
+            _logger.info("Segment: {} is already committed and is considered 
caught up.", segName);

Review Comment:
   In this scenario, a new consuming segment should already be created. 
Consider re-fetch the consuming segment for the table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to