Copilot commented on code in PR #17707:
URL: https://github.com/apache/pinot/pull/17707#discussion_r2810522934


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -717,7 +731,7 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
 
     // Handle segment movement if necessary
-    if (newConsumingSegmentName != null) {
+    if (newConsumingSegmentInIdealState && newConsumingSegmentName != null) {

Review Comment:
   The condition `newConsumingSegmentInIdealState && newConsumingSegmentName != 
null` is redundant. At this point, `newConsumingSegmentInIdealState` can only 
be true if `newConsumingSegmentName != null` (see lines 706-711 where it's set 
to false and the name is nulled together). The null check is unnecessary and 
can be simplified to just `if (newConsumingSegmentInIdealState)`.
   ```suggestion
       if (newConsumingSegmentInIdealState) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -806,40 +820,50 @@ private String createNewSegmentMetadata(TableConfig 
tableConfig, IdealState idea
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && !isTopicPaused(idealState, 
committingSegmentName)) {
-      LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
-      int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
-
-      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
-      Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
-      if (partitionIds.contains(committingSegmentPartitionGroupId)) {
-        String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
-        long newSegmentCreationTimeMs = getCurrentTimeMs();
-        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
-            committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-
-        StreamConfig streamConfig =
-            
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
-        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
-            numReplicas);
-        newConsumingSegmentName = newLLCSegment.getSegmentName();
-        LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
-            Status.IN_PROGRESS);
-      } else {
-        LOGGER.info(
-            "Skipping creation of new segment metadata after segment: {} 
during commit. Reason: Partition ID: {} not "
-                + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
-      }
+    LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
+    int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
+
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    Set<Integer> partitionIds = getPartitionIds(streamConfigs, 
realtimeTableName);
+
+    if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+      long newSegmentCreationTimeMs = getCurrentTimeMs();
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+
+      StreamConfig streamConfig =
+          
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
+          numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+      LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
+          Status.IN_PROGRESS);
     } else {
       LOGGER.info(
-          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: table: {} is paused.",
-          committingSegmentName, realtimeTableName);
+          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: Partition ID: {} not "
+              + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
     }
     return newConsumingSegmentName;

Review Comment:
   The pause state checks (isTablePaused and isTopicPaused) have been removed 
from Step-2. This causes unnecessary work: new segment metadata is now created 
even when the table/topic is paused, only to be removed later by the 
best-effort cleanup (lines 705-712) after Step-3 doesn't add it to IdealState. 
Consider checking pause state before creating metadata to avoid this wasteful 
create-then-delete cycle, especially since IdealState needs to be fetched 
anyway when partition IDs are unavailable from the stream (line 1211).



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1428,6 +1461,10 @@ IdealState updateIdealStateOnSegmentCompletion(String 
realtimeTableName, String
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
+      Map<String, String> committingSegmentInstanceStateMap = 
idealState.getInstanceStateMap(committingSegmentName);
+      Preconditions.checkState(committingSegmentInstanceStateMap != null && 
committingSegmentInstanceStateMap
+              .containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);

Review Comment:
   Moving the CONSUMING state validation to Step-3 (lines 1464-1467) creates a 
window where Steps 1 and 2 complete before validation occurs. If the committing 
segment is no longer in CONSUMING state when Step-3 runs (due to concurrent 
operations), the validation will fail after the committing segment metadata has 
been updated to DONE/COMMITTING (Step-1) and potentially new segment metadata 
has been created (Step-2). While the cleanup removes the new segment metadata 
(lines 698-702), the committing segment metadata remains in DONE/COMMITTING 
state but the segment stays in CONSUMING state in IdealState, causing 
inconsistency. Consider performing an early lightweight check of CONSUMING 
state before Step-1, or add recovery logic to revert Step-1 changes when Step-3 
validation fails.
   ```suggestion
         boolean hasConsumingInstance = committingSegmentInstanceStateMap != 
null
             && 
committingSegmentInstanceStateMap.containsValue(SegmentStateModel.CONSUMING);
         if (!hasConsumingInstance) {
           // Likely a race with repairs or other concurrent operations that 
already updated IdealState.
           // Do not fail the updater here to avoid partial cleanup that can 
leave metadata and IdealState inconsistent.
           LOGGER.warn("Skipping ideal state update for segment {} because no 
instance is in CONSUMING state",
               committingSegmentName);
           return idealState;
         }
   ```



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1187,6 +1187,23 @@ public void testCommitSegmentMetadata() {
     Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
   }
 
+  @Test
+  public void 
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+    setUpNewTable(segmentManager, 2, 5, 4);
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    verify(segmentManager, never()).getIdealState(REALTIME_TABLE_NAME);
+  }

Review Comment:
   The test should include scenarios where:
   1. The table/topic is paused - verify that new segment metadata is created 
but then cleaned up correctly
   2. The IdealState update fails - verify that the new segment metadata is 
cleaned up
   3. The committing segment is no longer in CONSUMING state - verify proper 
error handling
   These edge cases are important to validate the new cleanup logic and the 
deferred validation introduced by this PR.



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1187,6 +1187,23 @@ public void testCommitSegmentMetadata() {
     Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
   }
 
+  @Test
+  public void 
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+    setUpNewTable(segmentManager, 2, 5, 4);
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    verify(segmentManager, never()).getIdealState(REALTIME_TABLE_NAME);
+  }

Review Comment:
   The test verifies that getIdealState is not called but doesn't validate the 
key correctness guarantees. The FakePinotLLCRealtimeSegmentManager overrides 
updateIdealStateOnSegmentCompletion (lines 2106-2114 in test file) but doesn't 
implement the CONSUMING state validation that was moved to Step-3 (lines 
1464-1467 in main file). This means the test doesn't actually verify that the 
validation happens when IdealState is fetched during Step-3. Consider adding 
assertions to verify the validation is executed, or use a partial mock that 
delegates to the real implementation for updateIdealStateOnSegmentCompletion.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -694,8 +692,24 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // the idealstate update fails due to contention. We serialize the updates 
to the idealstate
     // to reduce this contention. We may still contend with RetentionManager, 
or other updates
     // to idealstate from other controllers, but then we have the retry 
mechanism to get around that.
-    idealState =
-        updateIdealStateForSegments(tableConfig, committingSegmentName, 
newConsumingSegmentName, instancePartitions);
+    try {
+      idealState =
+          updateIdealStateForSegments(tableConfig, committingSegmentName, 
newConsumingSegmentName, instancePartitions);
+    } catch (RuntimeException e) {
+      if (newConsumingSegmentName != null) {
+        removeSegmentZKMetadataBestEffort(realtimeTableName, 
newConsumingSegmentName);
+      }
+      throw e;
+    }
+
+    boolean newConsumingSegmentInIdealState = false;
+    if (newConsumingSegmentName != null) {
+      newConsumingSegmentInIdealState = 
idealState.getRecord().getMapFields().containsKey(newConsumingSegmentName);
+      if (!newConsumingSegmentInIdealState) {
+        removeSegmentZKMetadataBestEffort(realtimeTableName, 
newConsumingSegmentName);
+        newConsumingSegmentName = null;
+      }
+    }

Review Comment:
   When the table/topic is paused, new segment metadata is created in Step-2 
but not added to IdealState in Step-3 (line 1469 passes null when paused). The 
cleanup logic then removes this orphaned metadata. However, there's a race 
condition: if the RealtimeSegmentValidationManager or another component reads 
segment metadata between Step-2 completion and the cleanup (lines 708-709), it 
will see a segment with IN_PROGRESS status that's not in IdealState. This could 
trigger unnecessary validation/repair actions or cause confusion in monitoring. 
While the cleanup mitigates this, consider checking pause state before creating 
metadata (possibly via the lazy IdealState supplier used in getPartitionIds) to 
avoid creating metadata that will be immediately discarded.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -806,40 +820,50 @@ private String createNewSegmentMetadata(TableConfig 
tableConfig, IdealState idea
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && !isTopicPaused(idealState, 
committingSegmentName)) {
-      LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
-      int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
-
-      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
-      Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
-      if (partitionIds.contains(committingSegmentPartitionGroupId)) {
-        String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
-        long newSegmentCreationTimeMs = getCurrentTimeMs();
-        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
-            committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-
-        StreamConfig streamConfig =
-            
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
-        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
-            numReplicas);
-        newConsumingSegmentName = newLLCSegment.getSegmentName();
-        LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
-            Status.IN_PROGRESS);
-      } else {
-        LOGGER.info(
-            "Skipping creation of new segment metadata after segment: {} 
during commit. Reason: Partition ID: {} not "
-                + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
-      }
+    LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
+    int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
+
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    Set<Integer> partitionIds = getPartitionIds(streamConfigs, 
realtimeTableName);
+
+    if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+      long newSegmentCreationTimeMs = getCurrentTimeMs();
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+
+      StreamConfig streamConfig =
+          
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
+          numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+      LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
+          Status.IN_PROGRESS);
     } else {
       LOGGER.info(
-          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: table: {} is paused.",
-          committingSegmentName, realtimeTableName);
+          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: Partition ID: {} not "
+              + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
     }
     return newConsumingSegmentName;
   }
 
+  private void removeSegmentZKMetadataBestEffort(String realtimeTableName, 
String segmentName) {
+    String segmentMetadataPath =
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
segmentName);
+    try {
+      if (!_propertyStore.remove(segmentMetadataPath, 
AccessOption.PERSISTENT)) {
+        LOGGER.warn("Failed to remove segment ZK metadata for segment: {} of 
table: {}", segmentName,
+            realtimeTableName);
+      } else {
+        LOGGER.info("Removed segment ZK metadata for segment: {} of table: 
{}", segmentName, realtimeTableName);

Review Comment:
   The removeSegmentZKMetadataBestEffort method logs at INFO level when 
successful (line 859) but only WARN when it fails (lines 856, 862). Since this 
is a cleanup operation that is "best effort" and failures are explicitly 
tolerated, logging successful removals at INFO creates noise. Consider either 
logging successes at DEBUG level, or elevating failures to ERROR since they 
indicate cleanup didn't happen and may cause issues later (orphaned metadata in 
ZooKeeper).
   ```suggestion
           LOGGER.debug("Removed segment ZK metadata for segment: {} of table: 
{}", segmentName, realtimeTableName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to