This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 33bb182c4d9 [controller] Avoid eager IdealState fetch in LLC segment 
commit (#17707)
33bb182c4d9 is described below

commit 33bb182c4d9d126b61004250e8201b628cb89bd0
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Feb 19 16:26:20 2026 -0800

    [controller] Avoid eager IdealState fetch in LLC segment commit (#17707)
    
    [controller] Add LLC commit-path edge-case coverage
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 155 ++++++++++++++------
 .../PinotLLCRealtimeSegmentManagerTest.java        | 158 ++++++++++++++++++++-
 2 files changed, 267 insertions(+), 46 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 55252182dd6..3e4f2754e5e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -655,10 +656,6 @@ public class PinotLLCRealtimeSegmentManager {
     String committingSegmentName = 
committingSegmentDescriptor.getSegmentName();
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
-    IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions.checkState(
-        
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-        "Failed to find instance in CONSUMING state in IdealState for segment: 
%s", committingSegmentName);
 
     /*
      * Update zookeeper in 3 steps.
@@ -680,7 +677,7 @@ public class PinotLLCRealtimeSegmentManager {
     // Step-2: Create new segment metadata if needed
     long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName =
-        createNewSegmentMetadata(tableConfig, idealState, 
committingSegmentDescriptor, committingSegmentZKMetadata,
+        createNewSegmentMetadata(tableConfig, committingSegmentDescriptor, 
committingSegmentZKMetadata,
             instancePartitions);
 
     preProcessCommitIdealStateUpdate();
@@ -689,13 +686,34 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Updating Idealstate for previous: {} and new segment: {}", 
committingSegmentName,
         newConsumingSegmentName);
     long startTimeNs3 = System.nanoTime();
+    Map<String, Map<String, String>> instanceStatesMapAfterStep3 = 
Collections.emptyMap();
+    boolean newConsumingSegmentInIdealState = false;
 
     // When multiple segments of the same table complete around the same time 
it is possible that
     // the idealstate update fails due to contention. We serialize the updates 
to the idealstate
     // to reduce this contention. We may still contend with RetentionManager, 
or other updates
     // to idealstate from other controllers, but then we have the retry 
mechanism to get around that.
-    idealState =
-        updateIdealStateForSegments(tableConfig, committingSegmentName, 
newConsumingSegmentName, instancePartitions);
+    try {
+      IdealState idealState =
+          updateIdealStateForSegments(tableConfig, committingSegmentName, 
newConsumingSegmentName, instancePartitions);
+      instanceStatesMapAfterStep3 = idealState.getRecord().getMapFields();
+      if (newConsumingSegmentName != null) {
+        newConsumingSegmentInIdealState = 
instanceStatesMapAfterStep3.containsKey(newConsumingSegmentName);
+        if (!newConsumingSegmentInIdealState) {
+          LOGGER.info(
+              "Cleaning up segment ZK metadata for new consuming segment {} of 
table {} because it was not added to "
+                  + "IdealState. This can happen when table/topic consumption 
is paused.",
+              newConsumingSegmentName, realtimeTableName);
+          removeSegmentZKMetadataBestEffort(realtimeTableName, 
newConsumingSegmentName);
+          newConsumingSegmentName = null;
+        }
+      }
+    } catch (RuntimeException e) {
+      if (newConsumingSegmentName != null) {
+        removeSegmentZKMetadataBestEffort(realtimeTableName, 
newConsumingSegmentName);
+      }
+      throw e;
+    }
 
     long endTimeNs = System.nanoTime();
     LOGGER.info(
@@ -717,8 +735,8 @@ public class PinotLLCRealtimeSegmentManager {
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
 
     // Handle segment movement if necessary
-    if (newConsumingSegmentName != null) {
-      handleSegmentMovement(realtimeTableName, 
idealState.getRecord().getMapFields(), committingSegmentName,
+    if (newConsumingSegmentInIdealState) {
+      handleSegmentMovement(realtimeTableName, instanceStatesMapAfterStep3, 
committingSegmentName,
           newConsumingSegmentName);
     }
   }
@@ -797,7 +815,7 @@ public class PinotLLCRealtimeSegmentManager {
 
   // Step 2: Create new segment metadata
   @Nullable
-  private String createNewSegmentMetadata(TableConfig tableConfig, IdealState 
idealState,
+  private String createNewSegmentMetadata(TableConfig tableConfig,
       CommittingSegmentDescriptor committingSegmentDescriptor, 
SegmentZKMetadata committingSegmentZKMetadata,
       InstancePartitions instancePartitions) {
     String committingSegmentName = 
committingSegmentDescriptor.getSegmentName();
@@ -806,40 +824,62 @@ public class PinotLLCRealtimeSegmentManager {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && !isTopicPaused(idealState, 
committingSegmentName)) {
-      LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
-      int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
-
-      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
-      Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
-      if (partitionIds.contains(committingSegmentPartitionGroupId)) {
-        String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
-        long newSegmentCreationTimeMs = getCurrentTimeMs();
-        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
-            committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-
-        StreamConfig streamConfig =
-            
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
-        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
-            numReplicas);
-        newConsumingSegmentName = newLLCSegment.getSegmentName();
-        LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
-            Status.IN_PROGRESS);
-      } else {
-        LOGGER.info(
-            "Skipping creation of new segment metadata after segment: {} 
during commit. Reason: Partition ID: {} not "
-                + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
+    LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
+    int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
+
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    PartitionIdsWithIdealState partitionIdsWithIdealState = 
getPartitionIdsWithIdealState(streamConfigs,
+        () -> getIdealState(realtimeTableName));
+    Set<Integer> partitionIds = partitionIdsWithIdealState._partitionIds;
+
+    if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+      IdealState idealState = partitionIdsWithIdealState._idealState;
+      if (idealState == null) {
+        idealState = getIdealState(realtimeTableName);
       }
+      if (idealState != null
+          && (isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName))) {
+        LOGGER.info("Skipping creation of new segment metadata after segment: 
{} during commit. Reason: table/topic is "
+            + "paused.", committingSegmentName);
+        return null;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+      long newSegmentCreationTimeMs = getCurrentTimeMs();
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+
+      StreamConfig streamConfig =
+          
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, 
committingSegmentPartitionGroupId);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
+          numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+      LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
+          Status.IN_PROGRESS);
     } else {
       LOGGER.info(
-          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: table: {} is paused.",
-          committingSegmentName, realtimeTableName);
+          "Skipping creation of new segment metadata after segment: {} during 
commit. Reason: Partition ID: {} not "
+              + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
     }
     return newConsumingSegmentName;
   }
 
+  private void removeSegmentZKMetadataBestEffort(String realtimeTableName, 
String segmentName) {
+    String segmentMetadataPath =
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
segmentName);
+    try {
+      if (!_propertyStore.remove(segmentMetadataPath, 
AccessOption.PERSISTENT)) {
+        LOGGER.warn("Failed to remove segment ZK metadata for segment: {} of 
table: {}", segmentName,
+            realtimeTableName);
+      } else {
+        LOGGER.debug("Removed segment ZK metadata for segment: {} of table: 
{}", segmentName, realtimeTableName);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while removing segment ZK metadata for 
segment: {} of table: {}", segmentName,
+          realtimeTableName, e);
+    }
+  }
+
   // Step 3: Update IdealState
   private IdealState updateIdealStateForSegments(TableConfig tableConfig, 
String committingSegmentName,
       String newConsumingSegmentName, InstancePartitions instancePartitions) {
@@ -1137,6 +1177,22 @@ public class PinotLLCRealtimeSegmentManager {
 
   @VisibleForTesting
   Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
+    return getPartitionIdsWithIdealState(streamConfigs, () -> 
idealState)._partitionIds;
+  }
+
+  private static class PartitionIdsWithIdealState {
+    private final Set<Integer> _partitionIds;
+    @Nullable
+    private final IdealState _idealState;
+
+    private PartitionIdsWithIdealState(Set<Integer> partitionIds, @Nullable 
IdealState idealState) {
+      _partitionIds = partitionIds;
+      _idealState = idealState;
+    }
+  }
+
+  private PartitionIdsWithIdealState 
getPartitionIdsWithIdealState(List<StreamConfig> streamConfigs,
+      Supplier<IdealState> idealStateSupplier) {
     Set<Integer> partitionIds = new HashSet<>();
     boolean allPartitionIdsFetched = true;
     int numStreams = streamConfigs.size();
@@ -1176,6 +1232,7 @@ public class PinotLLCRealtimeSegmentManager {
     // If it is failing to fetch partition ids from stream (usually transient 
due to stream metadata service outage),
     // we need to use the existing partition information from ideal state to 
keep same ingestion behavior.
     if (!allPartitionIdsFetched) {
+      IdealState idealState = idealStateSupplier.get();
       LOGGER.info(
           "Fetch partition ids from Stream incomplete, merge fetched 
partitionIds with partition group metadata "
               + "for: {}", idealState.getId());
@@ -1188,8 +1245,9 @@ public class PinotLLCRealtimeSegmentManager {
       partitionIds.addAll(newPartitionGroupMetadataList.stream()
           .map(PartitionGroupMetadata::getPartitionGroupId)
           .collect(Collectors.toSet()));
+      return new PartitionIdsWithIdealState(partitionIds, idealState);
     }
-    return partitionIds;
+    return new PartitionIdsWithIdealState(partitionIds, null);
   }
 
   /**
@@ -1428,9 +1486,9 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
-      
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
-          isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName) ? null : newSegmentName,
-          segmentAssignment, instancePartitionsMap);
+    
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
+        isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName), newSegmentName,
+        segmentAssignment, instancePartitionsMap);
       return idealState;
     };
     if (_controllerConf.getSegmentCompletionGroupCommitEnabled()) {
@@ -1486,9 +1544,20 @@ public class PinotLLCRealtimeSegmentManager {
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, 
String>> instanceStatesMap,
       @Nullable String committingSegmentName, @Nullable String newSegmentName, 
SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
committingSegmentName, false, newSegmentName,
+        segmentAssignment, instancePartitionsMap);
+  }
+
+  private void updateInstanceStatesForNewConsumingSegment(Map<String, 
Map<String, String>> instanceStatesMap,
+      @Nullable String committingSegmentName, boolean isTableOrTopicPaused, 
@Nullable String newSegmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap) {
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
-      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+      Map<String, String> committingSegmentInstanceStateMap = 
instanceStatesMap.get(committingSegmentName);
+      Preconditions.checkState(committingSegmentInstanceStateMap != null && 
committingSegmentInstanceStateMap
+              .containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
+      Set<String> instances = committingSegmentInstanceStateMap.keySet();
       instanceStatesMap.put(committingSegmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
       LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
@@ -1501,7 +1570,7 @@ public class PinotLLCRealtimeSegmentManager {
     // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
     // The following check prevents the table from going into such a state 
(but does not prevent the root cause
     // of attempting such a zk update).
-    if (newSegmentName != null) {
+    if (newSegmentName != null && !isTableOrTopicPaused) {
       LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
       int partitionId = newLLCSegmentName.getPartitionGroupId();
       int seqNum = newLLCSegmentName.getSequenceNumber();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 94f3abba16e..6f05cd437e4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1187,6 +1187,155 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
   }
 
+  @Test
+  public void 
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+    setUpNewTable(segmentManager, 2, 5, 4);
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    verify(segmentManager, atLeastOnce()).getIdealState(REALTIME_TABLE_NAME);
+  }
+
+  @Test
+  public void 
testCommitSegmentMetadataFetchesIdealStateWhenPartitionIdsFallbackNeeded() {
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+    setUpNewTable(segmentManager, 2, 5, 4);
+    segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
+        .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
+        .collect(Collectors.toList());
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    verify(segmentManager, atLeastOnce()).getIdealState(REALTIME_TABLE_NAME);
+  }
+
+  @Test
+  public void 
testCommitSegmentMetadataSkipsCreatingNewMetadataWhenTopicPausedIfPartitionIdsFallbackNeeded()
 {
+    FakePinotLLCRealtimeSegmentManager segmentManager = spy(new 
FakePinotLLCRealtimeSegmentManager());
+    setUpNewTable(segmentManager, 2, 5, 4);
+    segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
+        .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
+        .collect(Collectors.toList());
+
+    PauseState pauseState =
+        new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE, 
"pause-topic-for-test",
+            Long.toString(CURRENT_TIME_MS), Collections.singletonList(0));
+    
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+        pauseState.toJsonString());
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+    String expectedNewConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 
1, CURRENT_TIME_MS).getSegmentName();
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    
assertFalse(segmentManager._segmentZKMetadataMap.containsKey(expectedNewConsumingSegment));
+    
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(expectedNewConsumingSegment));
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+    verify(propertyStore, never()).remove(anyString(), 
eq(AccessOption.PERSISTENT));
+  }
+
+  @Test
+  public void testCommitSegmentMetadataCleansUpMetadataWhenTablePaused() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    setUpNewTable(segmentManager, 2, 5, 4);
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+    when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+    PauseState pauseState = new PauseState(true, 
PauseState.ReasonCode.ADMINISTRATIVE, "pause-for-test",
+        Long.toString(CURRENT_TIME_MS), Collections.emptyList());
+    
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+        pauseState.toJsonString());
+    segmentManager._idealState.getRecord()
+        .setSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED, 
Boolean.TRUE.toString());
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, 
CURRENT_TIME_MS).getSegmentName();
+    
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(newConsumingSegment));
+    verify(propertyStore, never()).remove(
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, 
newConsumingSegment),
+        AccessOption.PERSISTENT);
+  }
+
+  @Test
+  public void testCommitSegmentMetadataCleansUpMetadataWhenTopicPaused() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    setUpNewTable(segmentManager, 2, 5, 4);
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+    when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+    PauseState pauseState =
+        new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE, 
"pause-topic-for-test",
+            Long.toString(CURRENT_TIME_MS), Collections.singletonList(0));
+    
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+        pauseState.toJsonString());
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+    String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, 
CURRENT_TIME_MS).getSegmentName();
+    
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(newConsumingSegment));
+    verify(propertyStore, never()).remove(
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, 
newConsumingSegment),
+        AccessOption.PERSISTENT);
+  }
+
+  @Test
+  public void 
testCommitSegmentMetadataCleansUpMetadataWhenCommittingSegmentNotConsuming() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    setUpNewTable(segmentManager, 2, 5, 4);
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+    when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+    String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    
segmentManager._idealState.getRecord().getMapFields().get(committingSegment)
+        .replaceAll((instance, state) -> SegmentStateModel.ONLINE);
+
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 
0L, "http://control_vip/segments/1";);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+    try {
+      segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+      fail("Expected commitSegmentMetadata to fail when committing segment has 
no CONSUMING instance");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("CONSUMING"));
+    }
+
+    String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, 
CURRENT_TIME_MS).getSegmentName();
+    verify(propertyStore).remove(
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, 
newConsumingSegment),
+        AccessOption.PERSISTENT);
+  }
+
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
@@ -2089,9 +2238,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, 
String committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) 
{
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 committingSegmentName, null,
-          segmentAssignment, instancePartitionsMap);
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 null, newSegmentName,
+      Map<String, String> committingSegmentInstanceStateMap = 
_idealState.getInstanceStateMap(committingSegmentName);
+      Preconditions.checkState(committingSegmentInstanceStateMap != null && 
committingSegmentInstanceStateMap
+              .containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
+      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 committingSegmentName,
+          isTablePaused(_idealState) || isTopicPaused(_idealState, 
committingSegmentName) ? null : newSegmentName,
           segmentAssignment, instancePartitionsMap);
       return _idealState;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to