Copilot commented on code in PR #17707:
URL: https://github.com/apache/pinot/pull/17707#discussion_r2810522934
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -717,7 +731,7 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
// Handle segment movement if necessary
- if (newConsumingSegmentName != null) {
+ if (newConsumingSegmentInIdealState && newConsumingSegmentName != null) {
Review Comment:
The condition `newConsumingSegmentInIdealState && newConsumingSegmentName !=
null` is redundant. At this point, `newConsumingSegmentInIdealState` can only
be true if `newConsumingSegmentName != null` (see lines 706-711 where it's set
to false and the name is nulled together). The null check is unnecessary and
can be simplified to just `if (newConsumingSegmentInIdealState)`.
```suggestion
if (newConsumingSegmentInIdealState) {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -806,40 +820,50 @@ private String createNewSegmentMetadata(TableConfig
tableConfig, IdealState idea
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) && !isTopicPaused(idealState,
committingSegmentName)) {
- LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
- int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
-
- List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
- Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
- if (partitionIds.contains(committingSegmentPartitionGroupId)) {
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
-
- StreamConfig streamConfig =
-
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
- LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
- Status.IN_PROGRESS);
- } else {
- LOGGER.info(
- "Skipping creation of new segment metadata after segment: {}
during commit. Reason: Partition ID: {} not "
- + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
- }
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
+
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+ Set<Integer> partitionIds = getPartitionIds(streamConfigs,
realtimeTableName);
+
+ if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+
+ StreamConfig streamConfig =
+
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
+ numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
+ Status.IN_PROGRESS);
} else {
LOGGER.info(
- "Skipping creation of new segment metadata after segment: {} during
commit. Reason: table: {} is paused.",
- committingSegmentName, realtimeTableName);
+ "Skipping creation of new segment metadata after segment: {} during
commit. Reason: Partition ID: {} not "
+ + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
}
return newConsumingSegmentName;
Review Comment:
The pause state checks (isTablePaused and isTopicPaused) have been removed
from Step-2. This causes unnecessary work: new segment metadata is now created
even when the table/topic is paused, only to be removed later by the
best-effort cleanup (lines 705-712) after Step-3 doesn't add it to IdealState.
Consider checking pause state before creating metadata to avoid this wasteful
create-then-delete cycle, especially since IdealState needs to be fetched
anyway when partition IDs are unavailable from the stream (line 1211).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1428,6 +1461,10 @@ IdealState updateIdealStateOnSegmentCompletion(String
realtimeTableName, String
throw new HelixHelper.PermanentUpdaterException(
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
+ Map<String, String> committingSegmentInstanceStateMap =
idealState.getInstanceStateMap(committingSegmentName);
+ Preconditions.checkState(committingSegmentInstanceStateMap != null &&
committingSegmentInstanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
Review Comment:
Moving the CONSUMING state validation to Step-3 (lines 1464-1467) creates a
window where Steps 1 and 2 complete before validation occurs. If the committing
segment is no longer in CONSUMING state when Step-3 runs (due to concurrent
operations), the validation will fail after the committing segment metadata has
been updated to DONE/COMMITTING (Step-1) and potentially new segment metadata
has been created (Step-2). While the cleanup removes the new segment metadata
(lines 698-702), the committing segment metadata remains in DONE/COMMITTING
state but the segment stays in CONSUMING state in IdealState, causing
inconsistency. Consider performing an early lightweight check of CONSUMING
state before Step-1, or add recovery logic to revert Step-1 changes when Step-3
validation fails.
```suggestion
boolean hasConsumingInstance = committingSegmentInstanceStateMap !=
null
&&
committingSegmentInstanceStateMap.containsValue(SegmentStateModel.CONSUMING);
if (!hasConsumingInstance) {
// Likely a race with repairs or other concurrent operations that
already updated IdealState.
// Do not fail the updater here to avoid partial cleanup that can
leave metadata and IdealState inconsistent.
LOGGER.warn("Skipping ideal state update for segment {} because no
instance is in CONSUMING state",
committingSegmentName);
return idealState;
}
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1187,6 +1187,23 @@ public void testCommitSegmentMetadata() {
Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
}
+ @Test
+ public void
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+ setUpNewTable(segmentManager, 2, 5, 4);
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ verify(segmentManager, never()).getIdealState(REALTIME_TABLE_NAME);
+ }
Review Comment:
The test should include scenarios where:
1. The table/topic is paused - verify that new segment metadata is created
but then cleaned up correctly
2. The IdealState update fails - verify that the new segment metadata is
cleaned up
3. The committing segment is no longer in CONSUMING state - verify proper
error handling
These edge cases are important to validate the new cleanup logic and the
deferred validation introduced by this PR.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1187,6 +1187,23 @@ public void testCommitSegmentMetadata() {
Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
}
+ @Test
+ public void
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+ setUpNewTable(segmentManager, 2, 5, 4);
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ verify(segmentManager, never()).getIdealState(REALTIME_TABLE_NAME);
+ }
Review Comment:
The test verifies that getIdealState is not called but doesn't validate the
key correctness guarantees. The FakePinotLLCRealtimeSegmentManager overrides
updateIdealStateOnSegmentCompletion (lines 2106-2114 in test file) but doesn't
implement the CONSUMING state validation that was moved to Step-3 (lines
1464-1467 in main file). This means the test doesn't actually verify that the
validation happens when IdealState is fetched during Step-3. Consider adding
assertions to verify the validation is executed, or use a partial mock that
delegates to the real implementation for updateIdealStateOnSegmentCompletion.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -694,8 +692,24 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
// the idealstate update fails due to contention. We serialize the updates
to the idealstate
// to reduce this contention. We may still contend with RetentionManager,
or other updates
// to idealstate from other controllers, but then we have the retry
mechanism to get around that.
- idealState =
- updateIdealStateForSegments(tableConfig, committingSegmentName,
newConsumingSegmentName, instancePartitions);
+ try {
+ idealState =
+ updateIdealStateForSegments(tableConfig, committingSegmentName,
newConsumingSegmentName, instancePartitions);
+ } catch (RuntimeException e) {
+ if (newConsumingSegmentName != null) {
+ removeSegmentZKMetadataBestEffort(realtimeTableName,
newConsumingSegmentName);
+ }
+ throw e;
+ }
+
+ boolean newConsumingSegmentInIdealState = false;
+ if (newConsumingSegmentName != null) {
+ newConsumingSegmentInIdealState =
idealState.getRecord().getMapFields().containsKey(newConsumingSegmentName);
+ if (!newConsumingSegmentInIdealState) {
+ removeSegmentZKMetadataBestEffort(realtimeTableName,
newConsumingSegmentName);
+ newConsumingSegmentName = null;
+ }
+ }
Review Comment:
When the table/topic is paused, new segment metadata is created in Step-2
but not added to IdealState in Step-3 (line 1469 passes null when paused). The
cleanup logic then removes this orphaned metadata. However, there's a race
condition: if the RealtimeSegmentValidationManager or another component reads
segment metadata between Step-2 completion and the cleanup (lines 708-709), it
will see a segment with IN_PROGRESS status that's not in IdealState. This could
trigger unnecessary validation/repair actions or cause confusion in monitoring.
While the cleanup mitigates this, consider checking pause state before creating
metadata (possibly via the lazy IdealState supplier used in getPartitionIds) to
avoid creating metadata that will be immediately discarded.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -806,40 +820,50 @@ private String createNewSegmentMetadata(TableConfig
tableConfig, IdealState idea
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) && !isTopicPaused(idealState,
committingSegmentName)) {
- LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
- int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
-
- List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
- Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
- if (partitionIds.contains(committingSegmentPartitionGroupId)) {
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
-
- StreamConfig streamConfig =
-
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
- LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
- Status.IN_PROGRESS);
- } else {
- LOGGER.info(
- "Skipping creation of new segment metadata after segment: {}
during commit. Reason: Partition ID: {} not "
- + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
- }
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
+
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+ Set<Integer> partitionIds = getPartitionIds(streamConfigs,
realtimeTableName);
+
+ if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+
+ StreamConfig streamConfig =
+
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
+ numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
+ Status.IN_PROGRESS);
} else {
LOGGER.info(
- "Skipping creation of new segment metadata after segment: {} during
commit. Reason: table: {} is paused.",
- committingSegmentName, realtimeTableName);
+ "Skipping creation of new segment metadata after segment: {} during
commit. Reason: Partition ID: {} not "
+ + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
}
return newConsumingSegmentName;
}
+ private void removeSegmentZKMetadataBestEffort(String realtimeTableName,
String segmentName) {
+ String segmentMetadataPath =
+
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName);
+ try {
+ if (!_propertyStore.remove(segmentMetadataPath,
AccessOption.PERSISTENT)) {
+ LOGGER.warn("Failed to remove segment ZK metadata for segment: {} of
table: {}", segmentName,
+ realtimeTableName);
+ } else {
+ LOGGER.info("Removed segment ZK metadata for segment: {} of table:
{}", segmentName, realtimeTableName);
Review Comment:
The removeSegmentZKMetadataBestEffort method logs at INFO level when
successful (line 859) but only WARN when it fails (lines 856, 862). Since this
is a cleanup operation that is "best effort" and failures are explicitly
tolerated, logging successful removals at INFO creates noise. Consider either
logging successes at DEBUG level, or elevating failures to ERROR since they
indicate cleanup didn't happen and may cause issues later (orphaned metadata in
ZooKeeper).
```suggestion
LOGGER.debug("Removed segment ZK metadata for segment: {} of table:
{}", segmentName, realtimeTableName);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]