This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cedb6c66c22 [bugfix] Fix per-stream partition count in segment 
metadata for consuming segments in multi-stream tables (#18401)
cedb6c66c22 is described below

commit cedb6c66c22f42648db3240121eaaf348bca86d6
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue May 5 18:19:47 2026 -0700

    [bugfix] Fix per-stream partition count in segment metadata for consuming 
segments in multi-stream tables (#18401)
    
    * [bugfix] Fix per-stream partition count in segment metadata for consuming 
segments in multi-stream tables
    
    * Fixed unnecessary change
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 29 ++++++++---
 .../PinotLLCRealtimeSegmentManagerTest.java        | 58 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 8 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9894de6aa82..a81a9e16112 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1135,7 +1135,8 @@ public class PinotLLCRealtimeSegmentManager implements 
PinotClusterConfigChangeL
   }
 
   @Nullable
-  private SegmentPartitionMetadata 
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
+  @VisibleForTesting
+  SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig 
tableConfig, int partitionId,
       int numPartitionGroups) {
     SegmentPartitionConfig partitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (partitionConfig == null) {
@@ -1145,18 +1146,30 @@ public class PinotLLCRealtimeSegmentManager implements 
PinotClusterConfigChangeL
     if (columnPartitionMap.size() == 1) {
       Map.Entry<String, ColumnPartitionConfig> entry = 
columnPartitionMap.entrySet().iterator().next();
       ColumnPartitionConfig columnPartitionConfig = entry.getValue();
-      if (numPartitionGroups != columnPartitionConfig.getNumPartitions()) {
-        LOGGER.warn("Number of partition groups fetched from the stream '{}' 
is different than "
-                + "columnPartitionConfig.numPartitions '{}' in the table 
config. The stream partition count is used. "
-                + "Please update the table config accordingly.", 
numPartitionGroups,
-            columnPartitionConfig.getNumPartitions());
-      }
       // For multi-stream tables, convert Pinot partition ID (which includes 
padding offset) to stream partition ID.
       // This ensures the partition metadata stored in ZK matches what the 
broker's partition function computes
       // during query pruning. For example, stream 1 partition 5 has Pinot 
partition ID 10005, but should store 5.
       int streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(tableConfig, 
partitionId);
+      // If there are multiple streams, we assume the partition groups are 
evenly distributed across the streams, and
+      // compute the per-stream partition group count accordingly.
+      // This is needed for the partition function to correctly compute the 
stream partition id for pruning.
+      int numStreams = 
IngestionConfigUtils.getStreamConfigs(tableConfig).size();
+      if (numStreams > 1 && numPartitionGroups % numStreams != 0) {
+        LOGGER.warn("Number of partition groups '{}' is not divisible by 
number of streams '{}'. This might lead to "
+                + "incorrect pruning if the partition function is based on 
stream partition id. Please update the "
+                + "table config to ensure the partition groups are evenly 
distributed across the streams.",
+            numPartitionGroups, numStreams);
+        return null;
+      }
+      int perStreamNumPartitions = numStreams > 1 ? numPartitionGroups / 
numStreams : numPartitionGroups;
+      if (perStreamNumPartitions != columnPartitionConfig.getNumPartitions()) {
+        LOGGER.warn("Number of partitions per stream '{}' is different than "
+                + "columnPartitionConfig.numPartitions '{}' in the table 
config. "
+                + "The stream partition count is used. Please update the table 
config accordingly.",
+            perStreamNumPartitions, columnPartitionConfig.getNumPartitions());
+      }
       ColumnPartitionMetadata columnPartitionMetadata =
-          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
numPartitionGroups,
+          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
perStreamNumPartitions,
               Collections.singleton(streamPartitionId), 
columnPartitionConfig.getFunctionConfig());
       return new 
SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(), 
columnPartitionMetadata));
     } else {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 58ec7039c67..e84a6d357cb 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.BatchConfig;
@@ -74,12 +75,17 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.PauseState;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -2319,6 +2325,58 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(segmentManager.getMaxSegmentCompletionTimeMillis(), 600_000L);
   }
 
+  @Test
+  public void testGetPartitionMetadataFromTableConfig() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    Map<String, String> singleStreamConfigMap =
+        
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+    SegmentPartitionConfig partitionConfig = new SegmentPartitionConfig(
+        Collections.singletonMap("col", new ColumnPartitionConfig("Modulo", 4, 
null)));
+
+    // No SegmentPartitionConfig → null
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setStreamConfigs(singleStreamConfigMap).build();
+    assertNull(segmentManager.getPartitionMetadataFromTableConfig(tableConfig, 
2, 4));
+
+    // Single-stream: perStreamNumPartitions = numPartitionGroups
+    tableConfig.getIndexingConfig().setSegmentPartitionConfig(partitionConfig);
+    SegmentPartitionMetadata metadata = 
segmentManager.getPartitionMetadataFromTableConfig(tableConfig, 2, 4);
+    assertNotNull(metadata);
+    ColumnPartitionMetadata colMetadata = 
metadata.getColumnPartitionMap().get("col");
+    assertEquals(colMetadata.getNumPartitions(), 4);
+    assertEquals(colMetadata.getPartitions(), Collections.singleton(2));
+
+    // Multi-stream, even distribution: perStreamNumPartitions = 
numPartitionGroups / numStreams
+    // 2 streams × 4 partitions each = 8 total partition groups
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(
+        new StreamIngestionConfig(Arrays.asList(singleStreamConfigMap, 
singleStreamConfigMap)));
+    TableConfig multiStreamTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
+            .build();
+    
multiStreamTableConfig.getIndexingConfig().setSegmentPartitionConfig(partitionConfig);
+
+    // Stream 0, partition 2: Pinot partition ID = 0 * 10000 + 2 = 2
+    metadata = 
segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig, 2, 
8);
+    assertNotNull(metadata);
+    colMetadata = metadata.getColumnPartitionMap().get("col");
+    assertEquals(colMetadata.getNumPartitions(), 4,
+        "Multi-stream partition count must be per-stream (numPartitionGroups / 
numStreams), not total");
+    assertEquals(colMetadata.getPartitions(), Collections.singleton(2));
+
+    // Stream 1, partition 3: Pinot partition ID = 1 * 10000 + 3 = 10003
+    int stream1Partition3 = IngestionConfigUtils.PARTITION_PADDING_OFFSET + 3;
+    metadata = 
segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig, 
stream1Partition3, 8);
+    assertNotNull(metadata);
+    colMetadata = metadata.getColumnPartitionMap().get("col");
+    assertEquals(colMetadata.getNumPartitions(), 4);
+    assertEquals(colMetadata.getPartitions(), Collections.singleton(3));
+
+    // Multi-stream, uneven distribution: numPartitionGroups not divisible by 
numStreams → null
+    
assertNull(segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig,
 0, 7),
+        "Uneven partition distribution across streams must return null");
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to