kmozaid commented on code in PR #8255:
URL: https://github.com/apache/pinot/pull/8255#discussion_r852823004


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,30 +350,39 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each 
partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match 
between partition columns of segment and
+          // partition columns of table configuration, and there is only 
partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered 
as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot 
partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = 
columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          Set<String> partitionColumns = columnPartitionMap.keySet();
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
-            // Handle segments that have multiple partitions or no partition 
info
+            Map<String, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
             List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
             for (SegmentZKMetadata selectedSegment : 
selectedSegmentsPerBucket) {
               SegmentPartitionMetadata segmentPartitionMetadata = 
selectedSegment.getPartitionMetadata();
-              if (segmentPartitionMetadata == null
-                  || 
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+              List<Integer> partitionsBuffer = new ArrayList<>();
+              if (segmentPartitionMetadata != null && partitionColumns.equals(
+                  segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
+                for (String partitionColumn : partitionColumns) {

Review Comment:
   done.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,30 +350,39 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each 
partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match 
between partition columns of segment and
+          // partition columns of table configuration, and there is only 
partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered 
as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot 
partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = 
columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          Set<String> partitionColumns = columnPartitionMap.keySet();
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
-            // Handle segments that have multiple partitions or no partition 
info
+            Map<String, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
             List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
             for (SegmentZKMetadata selectedSegment : 
selectedSegmentsPerBucket) {
               SegmentPartitionMetadata segmentPartitionMetadata = 
selectedSegment.getPartitionMetadata();
-              if (segmentPartitionMetadata == null
-                  || 
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+              List<Integer> partitionsBuffer = new ArrayList<>();
+              if (segmentPartitionMetadata != null && partitionColumns.equals(
+                  segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
+                for (String partitionColumn : partitionColumns) {
+                  if 
(segmentPartitionMetadata.getPartitions(partitionColumn).size() == 1) {
+                    
partitionsBuffer.add(segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next());
+                  } else {
+                    partitionsBuffer.clear();
+                    break;
+                  }
+                }
+              }
+              if (partitionsBuffer.isEmpty()) {
                 outlierSegments.add(selectedSegment);
               } else {
-                int partition = 
segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
-                partitionToSegments.computeIfAbsent(partition, k -> new 
ArrayList<>()).add(selectedSegment);
+                String partitionId = StringUtils.join(partitionsBuffer, "_");
+                partitionToSegments.computeIfAbsent(partitionId, k -> new 
ArrayList<>()).add(selectedSegment);
               }
             }
 
-            for (Map.Entry<Integer, List<SegmentZKMetadata>> 
partitionToSegmentsEntry
-                : partitionToSegments.entrySet()) {
+            for (Map.Entry<String, List<SegmentZKMetadata>> 
partitionToSegmentsEntry : partitionToSegments.entrySet()) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to