This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a022e4194d Fix partition info in segment ZK metadata for changes in 
stream partition count (#11476)
a022e4194d is described below

commit a022e4194d48cca27a674bb4aa24e08463a4892f
Author: Sajjad Moradi <[email protected]>
AuthorDate: Thu Aug 31 21:36:51 2023 -0700

    Fix partition info in segment ZK metadata for changes in stream partition 
count (#11476)
---
 .../helix/core/realtime/PinotLLCRealtimeSegmentManager.java | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d2a2860440..11d82875cf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -694,7 +694,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Add the partition metadata if available
     SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, 
newLLCSegmentName.getPartitionGroupId());
+        getPartitionMetadataFromTableConfig(tableConfig, 
newLLCSegmentName.getPartitionGroupId(), numPartitionGroups);
     if (partitionMetadata != null) {
       newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
     }
@@ -710,7 +710,8 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @Nullable
-  private SegmentPartitionMetadata 
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) {
+  private SegmentPartitionMetadata 
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
+      int numPartitionGroups) {
     SegmentPartitionConfig partitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (partitionConfig == null) {
       return null;
@@ -719,8 +720,14 @@ public class PinotLLCRealtimeSegmentManager {
     if (columnPartitionMap.size() == 1) {
       Map.Entry<String, ColumnPartitionConfig> entry = 
columnPartitionMap.entrySet().iterator().next();
       ColumnPartitionConfig columnPartitionConfig = entry.getValue();
+      if (numPartitionGroups != columnPartitionConfig.getNumPartitions()) {
+        LOGGER.warn("Number of partition groups fetched from the stream '{}' 
is different than "
+                + "columnPartitionConfig.numPartitions '{}' in the table 
config. The stream partition count is used. "
+                + "Please update the table config accordingly.", 
numPartitionGroups,
+            columnPartitionConfig.getNumPartitions());
+      }
       ColumnPartitionMetadata columnPartitionMetadata =
-          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
columnPartitionConfig.getNumPartitions(),
+          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
numPartitionGroups,
               Collections.singleton(partitionId), 
columnPartitionConfig.getFunctionConfig());
       return new 
SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(), 
columnPartitionMetadata));
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to