Copilot commented on code in PR #17706:
URL: https://github.com/apache/pinot/pull/17706#discussion_r2815081613


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java:
##########
@@ -242,6 +242,11 @@ public void testGenerateTasksNoMinionMetadata() {
     
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
         Lists.newArrayList(segmentZKMetadata1.getSegmentName(), 
segmentZKMetadata2.getSegmentName())));
 
+    RealtimeToOfflineSegmentsTaskGenerator debugGenerator = new 
RealtimeToOfflineSegmentsTaskGenerator();
+    debugGenerator.init(mockClusterInfoProvide);
+    System.out.println("Segments before generation (debug): "
+        + 
debugGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable(REALTIME_TABLE_NAME));
+

Review Comment:
   Debug code should not be committed to production. This block creates a debug 
generator and prints to System.out, which violates the project's logging 
standards. Please remove this debug code and use SLF4J logging if debugging is 
still needed.
   ```suggestion
   
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -725,6 +729,54 @@ public static List<SegmentZKMetadata> 
getSegmentsZKMetadata(ZkHelixPropertyStore
     }
   }
 
+  /**
+   * Applies the given consumer on segment ZK metadata for the given table in 
batches.
+   *
+   * @param batchSize  batch size for ZK get calls

Review Comment:
   The Javadoc comment is incomplete. It describes the parameters but doesn't 
specify what the method does at a high level. Consider adding a brief 
description like "Iterates over all segment ZK metadata for the given table and 
applies the provided consumer function to each segment metadata in batches." 
Also, the parameter descriptions should use proper @param tags.
   ```suggestion
      * Iterates over all segment ZK metadata for the given table and applies 
the provided consumer function
      * to each segment metadata in batches.
      *
      * @param propertyStore Helix property store used to fetch segment ZK 
metadata
      * @param tableNameWithType table name with type (e.g. "myTable_OFFLINE") 
whose segment metadata is processed
      * @param batchSize batch size for ZK get calls
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -273,6 +273,9 @@ public static class ControllerPeriodicTasksConf {
     public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
         "controller.retentionManager.agedSegmentsDeletionBatchSize";
     public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
+    public static final String SEGMENTS_ZK_METADATA_BATCH_SIZE =
+        "controller.retentionManager.segmentsZkMetadataBatchSize";

Review Comment:
   Inconsistent naming convention: The configuration property uses 
`segmentsZkMetadataBatchSize` (lowercase 'k') but the codebase consistently 
uses 'ZK' (uppercase) in other contexts like `SegmentZKMetadata`, 
`forEachSegmentsZKMetadata`, etc. For consistency with the rest of the 
codebase, consider using `segmentsZKMetadataBatchSize` instead.
   ```suggestion
           "controller.retentionManager.segmentsZKMetadataBatchSize";
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -725,6 +729,54 @@ public static List<SegmentZKMetadata> 
getSegmentsZKMetadata(ZkHelixPropertyStore
     }
   }
 
+  /**
+   * Applies the given consumer on segment ZK metadata for the given table in 
batches.
+   *
+   * @param batchSize  batch size for ZK get calls
+   * @param consumer function invoked for each non-null segment metadata
+   */
+  public static void forEachSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableNameWithType,
+      int batchSize, Consumer<SegmentZKMetadata> consumer) {
+    Preconditions.checkArgument(batchSize > 0, "Segment metadata batchSize 
must be greater than 0: %s", batchSize);
+
+    String segmentsPath = 
constructPropertyStorePathForResource(tableNameWithType);
+    List<String> segmentNames = getSegments(propertyStore, tableNameWithType);
+    if (segmentNames == null || segmentNames.isEmpty()) {
+      LOGGER.debug("No segments found under path: {}", segmentsPath);
+      return;
+    }
+
+    for (int startIndex = 0; startIndex < segmentNames.size(); startIndex += 
batchSize) {
+      int endIndex = Math.min(startIndex + batchSize, segmentNames.size());
+      List<String> segmentNameBatch = segmentNames.subList(startIndex, 
endIndex);
+
+      List<String> segmentPathBatch = new ArrayList<>(segmentNameBatch.size());
+      for (String segmentName : segmentNameBatch) {
+        
segmentPathBatch.add(constructPropertyStorePathForSegment(tableNameWithType, 
segmentName));
+      }
+
+      List<ZNRecord> znRecords = propertyStore.get(segmentPathBatch, null, 
AccessOption.PERSISTENT);
+      int numNullRecords = 0;
+      if (znRecords != null) {
+        for (int i = 0; i < segmentNameBatch.size(); i++) {
+          ZNRecord znRecord = i < znRecords.size() ? znRecords.get(i) : null;
+          if (znRecord == null) {
+            numNullRecords++;
+          } else {
+            consumer.accept(new SegmentZKMetadata(znRecord));
+          }
+        }
+      } else {
+        numNullRecords = segmentNameBatch.size();
+      }
+
+      if (numNullRecords > 0) {
+        LOGGER.warn("Failed to read {}/{} segment ZK metadata under path: {} 
for table: {}",
+            numNullRecords, segmentNameBatch.size(), segmentsPath, 
tableNameWithType);
+      }
+    }
+  }

Review Comment:
   The new `forEachSegmentZKMetadata` method lacks dedicated unit test 
coverage. Since this is a core API method that will be widely used across the 
codebase, it should have comprehensive tests covering edge cases like: empty 
segment lists, null ZNRecords, batch boundaries, and proper consumer 
invocation. Consider adding unit tests in a ZKMetadataProviderTest class.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -161,32 +162,54 @@ public int getMaxAttemptsPerTask(String minionTag) {
    * @return the list of segment zk metadata for available segments in the 
table.
    */
   public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String 
tableNameWithType) {
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
+  }
+
+  public List<SegmentZKMetadata> 
getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
     IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, 
segmentZKMetadata -> {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      return segmentZKMetadata.getStatus().isCompleted() // skip consuming 
segments
+          && 
!idealState.getInstanceStateMap(segmentName).containsValue(SegmentStateModel.CONSUMING);

Review Comment:
   Potential NullPointerException: `idealState` could be null when retrieved on 
line 169, but it's dereferenced on line 173 without null checking. If 
`getIdealState` returns null, this will throw an NPE when accessing 
`idealState.getInstanceStateMap(segmentName)`. Consider adding a null check or 
restructuring the code to handle this case.
   ```suggestion
       if (idealState == null) {
         return new ArrayList<>();
       }
       return getSegmentsZKMetadataInIdealState(tableNameWithType, 
segmentZKMetadata -> {
         String segmentName = segmentZKMetadata.getSegmentName();
         Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
         return segmentZKMetadata.getStatus().isCompleted() // skip consuming 
segments
             && instanceStateMap != null
             && !instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to