This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 562854b6883 Optimise fetchPartitionGroupIdToSmallestOffset in
RealtimeSegmentManager (#17712)
562854b6883 is described below
commit 562854b6883d39705fe2efa1483d93bb01eebd25
Author: NOOB <[email protected]>
AuthorDate: Sat Mar 7 02:32:49 2026 +0530
Optimise fetchPartitionGroupIdToSmallestOffset in RealtimeSegmentManager
(#17712)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 56 +++++++++++++++--
.../PinotLLCRealtimeSegmentManagerTest.java | 71 ++++++++++++++++++++++
2 files changed, 123 insertions(+), 4 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9ed39b67c46..120ab8841bd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1849,7 +1849,8 @@ public class PinotLLCRealtimeSegmentManager {
// Smallest offset is fetched from stream once and cached in
partitionIdToSmallestOffset.
if (partitionIdToSmallestOffset == null) {
- partitionIdToSmallestOffset =
fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
+ partitionIdToSmallestOffset =
+ fetchPartitionGroupIdToSmallestOffset(streamConfigs,
idealState, latestSegmentZKMetadataMap);
}
// Do not create new CONSUMING segment when the stream partition has
reached end of life.
@@ -1950,11 +1951,13 @@ public class PinotLLCRealtimeSegmentManager {
}
private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
- IdealState idealState) {
+ IdealState idealState, Map<Integer, SegmentZKMetadata>
latestSegmentZKMetadataMap) {
+ // Build consumption status from pre-computed ZK metadata map instead of
rescanning IdealState (O(1) vs O(N))
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+
buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap,
streamConfigs);
+
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset =
new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
@@ -1976,6 +1979,51 @@ public class PinotLLCRealtimeSegmentManager {
return partitionGroupIdToSmallestOffset;
}
+ /**
+ * Builds {@link PartitionGroupConsumptionStatus} list from the pre-computed
latest segment ZK metadata map,
+ * avoiding an O(N) scan of all IdealState segments that {@link
#getPartitionGroupConsumptionStatusList} performs.
+ */
+ @VisibleForTesting
+ List<PartitionGroupConsumptionStatus>
buildPartitionGroupConsumptionStatusFromZKMetadata(
+ Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap,
List<StreamConfig> streamConfigs) {
+ List<PartitionGroupConsumptionStatus> result = new
ArrayList<>(latestSegmentZKMetadataMap.size());
+ int numStreams = streamConfigs.size();
+ if (numStreams == 1) {
+ StreamPartitionMsgOffsetFactory offsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
+ for (Map.Entry<Integer, SegmentZKMetadata> entry :
latestSegmentZKMetadataMap.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ SegmentZKMetadata zkMetadata = entry.getValue();
+ LLCSegmentName llcSegmentName = new
LLCSegmentName(zkMetadata.getSegmentName());
+ result.add(new PartitionGroupConsumptionStatus(partitionGroupId,
llcSegmentName.getSequenceNumber(),
+ offsetFactory.create(zkMetadata.getStartOffset()),
+ zkMetadata.getEndOffset() != null ?
offsetFactory.create(zkMetadata.getEndOffset()) : null,
+ zkMetadata.getStatus().toString()));
+ }
+ } else {
+ StreamPartitionMsgOffsetFactory[] offsetFactories = new
StreamPartitionMsgOffsetFactory[numStreams];
+ for (Map.Entry<Integer, SegmentZKMetadata> entry :
latestSegmentZKMetadataMap.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ int index =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId);
+ int streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+ SegmentZKMetadata zkMetadata = entry.getValue();
+ LLCSegmentName llcSegmentName = new
LLCSegmentName(zkMetadata.getSegmentName());
+ StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index];
+ if (offsetFactory == null) {
+ offsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory();
+ offsetFactories[index] = offsetFactory;
+ }
+ result.add(new PartitionGroupConsumptionStatus(partitionGroupId,
streamPartitionId,
+ llcSegmentName.getSequenceNumber(),
+ offsetFactory.create(zkMetadata.getStartOffset()),
+ zkMetadata.getEndOffset() != null ?
offsetFactory.create(zkMetadata.getEndOffset()) : null,
+ zkMetadata.getStatus().toString()));
+ }
+ }
+ return result;
+ }
+
private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria
offsetCriteria, int partitionGroupId,
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
Map<Integer, StreamPartitionMsgOffset>
partitionGroupIdToSmallestStreamOffset, String tableName,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 13a15c045e4..e43fba113e2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -28,6 +28,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -1863,6 +1864,76 @@ public class PinotLLCRealtimeSegmentManagerTest {
Assert.assertEquals(partitionIds.size(), 2);
}
+ /**
+ * Verifies that {@code buildPartitionGroupConsumptionStatusFromZKMetadata}
produces the same results as
+ * {@code getPartitionGroupConsumptionStatusList} for the common case where
IdealState and ZK metadata are in sync.
+ * This validates that the optimization in {@code
fetchPartitionGroupIdToSmallestOffset} (reusing the pre-computed
+ * latestSegmentZKMetadataMap instead of rescanning the entire IdealState)
does not change behavior.
+ */
+ @Test
+ public void
testBuildPartitionGroupConsumptionStatusFromZKMetadataMatchesOriginal() {
+ // Set up a table with 2 replicas, 5 instances, 4 partitions
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ setUpNewTable(segmentManager, 2, 5, 4);
+
+ // Commit segments for partitions 0 and 1 to get a mix of ONLINE (DONE)
and CONSUMING (IN_PROGRESS) segments
+ for (int partitionGroupId = 0; partitionGroupId < 2; partitionGroupId++) {
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME,
partitionGroupId, 0, CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(segmentName,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() +
NUM_DOCS).toString(), 0L);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+ }
+
+ // Build latestSegmentZKMetadataMap from the fake ZK metadata (same logic
as getLatestSegmentZKMetadataMap)
+ Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = new
HashMap<>();
+ for (Map.Entry<String, SegmentZKMetadata> entry :
segmentManager._segmentZKMetadataMap.entrySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(entry.getKey());
+ int partitionId = llcSegmentName.getPartitionGroupId();
+ latestSegmentZKMetadataMap.merge(partitionId, entry.getValue(),
+ (existing, candidate) -> {
+ int existingSeq = new
LLCSegmentName(existing.getSegmentName()).getSequenceNumber();
+ int candidateSeq = new
LLCSegmentName(candidate.getSegmentName()).getSequenceNumber();
+ return candidateSeq > existingSeq ? candidate : existing;
+ });
+ }
+
+ // Get results from both methods
+ List<PartitionGroupConsumptionStatus> fromIdealState =
+
segmentManager.getPartitionGroupConsumptionStatusList(segmentManager._idealState,
+ segmentManager._streamConfigs);
+ List<PartitionGroupConsumptionStatus> fromZKMetadata =
+
segmentManager.buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap,
+ segmentManager._streamConfigs);
+
+ // Sort both by partition group id for comparison
+
fromIdealState.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));
+
fromZKMetadata.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));
+
+ // Verify same number of partitions
+ assertEquals(fromIdealState.size(), fromZKMetadata.size(),
+ "Both methods should return the same number of partitions");
+
+ // Verify each partition has identical consumption status
+ for (int i = 0; i < fromIdealState.size(); i++) {
+ PartitionGroupConsumptionStatus isStatus = fromIdealState.get(i);
+ PartitionGroupConsumptionStatus zkStatus = fromZKMetadata.get(i);
+
+ assertEquals(zkStatus.getPartitionGroupId(),
isStatus.getPartitionGroupId(),
+ "Partition group id mismatch at index " + i);
+ assertEquals(zkStatus.getSequenceNumber(), isStatus.getSequenceNumber(),
+ "Sequence number mismatch for partition " +
isStatus.getPartitionGroupId());
+ assertEquals(zkStatus.getStartOffset().toString(),
isStatus.getStartOffset().toString(),
+ "Start offset mismatch for partition " +
isStatus.getPartitionGroupId());
+ String zkEnd = zkStatus.getEndOffset() != null ?
zkStatus.getEndOffset().toString() : null;
+ String isEnd = isStatus.getEndOffset() != null ?
isStatus.getEndOffset().toString() : null;
+ assertEquals(zkEnd, isEnd,
+ "End offset mismatch for partition " +
isStatus.getPartitionGroupId());
+ assertEquals(zkStatus.getStatus(), isStatus.getStatus(),
+ "Status mismatch for partition " + isStatus.getPartitionGroupId());
+ }
+ }
+
@Test
public void testReduceSegmentSizeAndReset() {
// Set up a new table with 2 replicas, 5 instances, 4 partitions
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]