This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 493ed2c  MergeRollupTaskGenerator enhancement: enable parallel buckets 
scheduling  (#7481)
493ed2c is described below

commit 493ed2c0433fdc40f605f0b06ce5f6f98ce62725
Author: Jiapeng Tao <[email protected]>
AuthorDate: Mon Oct 11 19:27:53 2021 -0700

    MergeRollupTaskGenerator enhancement: enable parallel buckets scheduling  
(#7481)
---
 .../apache/pinot/core/common/MinionConstants.java  |   1 +
 .../mergerollup/MergeRollupTaskGenerator.java      | 283 ++++++++++++---------
 .../tasks/mergerollup/MergeRollupTaskUtils.java    |   3 +-
 .../mergerollup/MergeRollupTaskGeneratorTest.java  | 147 +++++++++--
 .../mergerollup/MergeRollupTaskUtilsTest.java      |   4 +-
 5 files changed, 301 insertions(+), 137 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 06ff9d5..0ce9980 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -88,6 +88,7 @@ public class MinionConstants {
     // Segment config
     public static final String MAX_NUM_RECORDS_PER_TASK_KEY = 
"maxNumRecordsPerTask";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = 
"maxNumRecordsPerSegment";
+    public static final String MAX_NUM_PARALLEL_BUCKETS = 
"maxNumParallelBuckets";
     public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
   }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index c1ab565..8e9265c 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.ZNRecord;
@@ -43,6 +44,7 @@ import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerato
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -65,36 +67,43 @@ import org.slf4j.LoggerFactory;
  *
  *  - Pre-select segments:
  *    - Fetch all segments, select segments based on segment lineage (removing 
segmentsFrom for COMPLETED lineage
- *    entry and
- *      segmentsTo for IN_PROGRESS lineage entry)
+ *      entry and segmentsTo for IN_PROGRESS lineage entry)
  *    - Remove empty segments
  *    - Sort segments based on startTime and endTime in ascending order
  *
  *  For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily -> 
Monthly -> Yearly):
  *    - Skip scheduling if there's incomplete task for the mergeLevel
- *    - Calculate merge/rollup window:
- *      - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
- *        found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>
- *        In case of cold-start, no ZNode will exist.
- *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
- *        closest bucket start time.
- *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
- *      - Skip scheduling if the window is invalid:
- *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
- *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
- *      - Bump up target window and watermark if needed.
- *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *    - Schedule tasks for at most k time buckets, k is up to 
maxNumParallelBuckets (by default 1) at best effort
+ *    - Repeat until k time buckets get created or we loop through all the 
candidate segments:
+ *      - Calculate merge/roll-up bucket:
+ *        - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode 
found at
+ *          {@code MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>}
+ *          In case of cold-start, no ZNode will exist.
+ *          A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
+ *          closest bucket start time.
+ *        - The execution window for the task is calculated as,
+ *          bucketStartMs = watermarkMs
+ *          bucketEndMs = bucketStartMs + bucketTimeMs
+ *          - bucketEndMs must be equal or older than the bufferTimeMs
+ *          - bucketEndMs of higher merge level should be less or equal to the 
waterMarkMs of lower level
+ *        - Bump up target window and watermark if needed.
+ *          - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
+ *            current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *          - Else skip the scheduling.
+ *      - Select segments for the bucket:
+ *        - Skip buckets which all segments are merged
+ *        - If there's no spilled over segments (segments spanning multiple 
time buckets), schedule buckets in parallel
+ *        - Else, schedule buckets till the first one that has spilled over 
data (included), so the spilled over data
+ *          will be merged next round
+ *      - Create the tasks for the current bucket (and per partition for 
partitioned tables) based on
+ *        maxNumRecordsPerTask
  */
 @TaskGenerator
 public class MergeRollupTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
 
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
+  private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
   private static final String REFRESH = "REFRESH";
 
   // This is the metric that keeps track of the task delay in the number of 
time buckets. For example, if we see this
@@ -156,8 +165,7 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
         // Reset the watermark time if no segment found. This covers the case 
where the table is newly created or
         // all segments for the existing table got deleted.
         resetDelayMetrics(offlineTableName);
-        LOGGER
-            .info("Skip generating task: {} for table: {}, no segment is 
found.", taskType, offlineTableName);
+        LOGGER.info("Skip generating task: {} for table: {}, no segment is 
found.", taskType, offlineTableName);
         continue;
       }
 
@@ -184,8 +192,8 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
 
       // Get incomplete merge levels
       Set<String> inCompleteMergeLevels = new HashSet<>();
-      for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils
-          .getIncompleteTasks(taskType, offlineTableName, 
_clusterInfoAccessor).entrySet()) {
+      for (Map.Entry<String, TaskState> entry : 
TaskGeneratorUtils.getIncompleteTasks(taskType, offlineTableName,
+          _clusterInfoAccessor).entrySet()) {
         for (PinotTaskConfig taskConfig : 
_clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
           
inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
         }
@@ -212,106 +220,133 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
           continue;
         }
 
-        // Get the bucket size and buffer
-        long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+        // Get the bucket size, buffer size and maximum number of parallel 
buckets (by default 1)
+        String bucketPeriod = 
mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY);
+        long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
         if (bucketMs <= 0) {
-          LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must 
be larger than 0", offlineTableName,
-              mergeLevel);
+          LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) 
must be larger than 0", bucketPeriod,
+              offlineTableName, mergeLevel);
+          continue;
+        }
+        String bufferPeriod = 
mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
+        long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+        if (bufferMs < 0) {
+          LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) 
must be larger or equal to 0",
+              bufferPeriod, offlineTableName, mergeLevel);
+          continue;
+        }
+        String maxNumParallelBucketsStr = 
mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
+        int maxNumParallelBuckets = maxNumParallelBucketsStr != null ? 
Integer.parseInt(maxNumParallelBucketsStr)
+            : DEFAULT_NUM_PARALLEL_BUCKETS;
+        if (maxNumParallelBuckets <= 0) {
+          LOGGER.error("Maximum number of parallel buckets: {} (table : {}, 
mergeLevel : {}) must be larger than 0",
+              maxNumParallelBuckets, offlineTableName, mergeLevel);
           continue;
         }
-        long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
-        long waterMarkMs =
+        // bucketStartMs = watermarkMs
+        // bucketEndMs = bucketStartMs + bucketMs
+        long bucketStartMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
-        long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
-
-        if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
-          LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is 
not a valid merge window, Skipping task "
-              + "generation: {}", windowStartMs, windowEndMs, mergeLevel, 
taskType);
+        long bucketEndMs = bucketStartMs + bucketMs;
+        if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
+          LOGGER.info("Bucket with start: {} and end: {} (table : {}, 
mergeLevel : {}) cannot be merged yet",
+              bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
           continue;
         }
 
-        // Find all segments overlapping with the merge window, if all 
overlapping segments are merged, bump up the
-        // target window
-        List<SegmentZKMetadata> selectedSegments = new ArrayList<>();
+        // Find overlapping segments for each bucket, skip the buckets that 
has all segments merged
+        List<List<SegmentZKMetadata>> selectedSegmentsForAllBuckets = new 
ArrayList<>(maxNumParallelBuckets);
+        List<SegmentZKMetadata> selectedSegmentsForBucket = new ArrayList<>();
         boolean hasUnmergedSegments = false;
-        boolean isValidMergeWindow = true;
+        boolean hasSpilledOverData = false;
 
         // The for loop terminates in following cases:
-        // 1. Found unmerged segments in target merge window, need to bump up 
watermark if windowStartMs > watermarkMs,
-        //    will schedule tasks
-        // 2. All segments are merged in the merge window and we have loop 
through all segments, skip scheduling
-        // 3. Merge window is invalid (windowEndMs > 
System.currentTimeMillis() - bufferMs || windowEndMs > waterMark of
-        //    the lower mergeLevel), skip scheduling
+        // 1. Found buckets with unmerged segments:
+        //    For each bucket find all segments overlapping with the target 
bucket, skip the bucket if all overlapping
+        //    segments are merged. Schedule k (numParallelBuckets) buckets at 
most, and stops at the first bucket that
+        //    contains spilled over data.
+        // 2. There's no bucket with unmerged segments, skip scheduling
         for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
           long startTimeMs = preSelectedSegment.getStartTimeMs();
-          if (startTimeMs < windowEndMs) {
+          if (startTimeMs < bucketEndMs) {
             long endTimeMs = preSelectedSegment.getEndTimeMs();
-            if (endTimeMs >= windowStartMs) {
-              // For segments overlapping with current window, add to the 
result list
-              selectedSegments.add(preSelectedSegment);
+            if (endTimeMs >= bucketStartMs) {
+              // For segments overlapping with current bucket, add to the 
result list
               if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
                 hasUnmergedSegments = true;
               }
+              if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
+                hasSpilledOverData = true;
+              }
+              selectedSegmentsForBucket.add(preSelectedSegment);
             }
-            // endTimeMs < windowStartMs
+            // endTimeMs < bucketStartMs
             // Haven't find the first overlapping segment, continue to the 
next segment
           } else {
-            // Has gone through all overlapping segments for current window
+            // Has gone through all overlapping segments for current bucket
             if (hasUnmergedSegments) {
-              // Found unmerged segments, schedule merge task for current 
window
+              // Add the bucket if there are unmerged segments
+              selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
+            }
+
+            if (selectedSegmentsForAllBuckets.size() == maxNumParallelBuckets 
|| hasSpilledOverData) {
+              // If there are enough buckets or found spilled over data, 
schedule merge tasks
               break;
             } else {
-              // No unmerged segments found, clean up selected segments and 
bump up the merge window
+              // Start with a new bucket
               // TODO: If there are many small merged segments, we should 
merge them again
-              selectedSegments.clear();
-              selectedSegments.add(preSelectedSegment);
+              selectedSegmentsForBucket = new ArrayList<>();
+              hasUnmergedSegments = false;
+              bucketStartMs = (startTimeMs / bucketMs) * bucketMs;
+              bucketEndMs = bucketStartMs + bucketMs;
+              if (!isValidBucketEndTime(bucketEndMs, bufferMs, 
lowerMergeLevel, mergeRollupTaskMetadata)) {
+                break;
+              }
               if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
                 hasUnmergedSegments = true;
               }
-              windowStartMs = startTimeMs / bucketMs * bucketMs;
-              windowEndMs = windowStartMs + bucketMs;
-              if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, 
lowerMergeLevel, mergeRollupTaskMetadata)) {
-                isValidMergeWindow = false;
-                break;
+              if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
+                hasSpilledOverData = true;
               }
+              selectedSegmentsForBucket.add(preSelectedSegment);
             }
           }
         }
 
-        if (!isValidMergeWindow) {
-          LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is 
not a valid merge window, Skipping task "
-              + "generation: {}", windowStartMs, windowEndMs, mergeLevel, 
taskType);
-          continue;
+        // Add the last bucket if it contains unmerged segments and is not 
added before
+        if (hasUnmergedSegments && (selectedSegmentsForAllBuckets.isEmpty() || 
(
+            
selectedSegmentsForAllBuckets.get(selectedSegmentsForAllBuckets.size() - 1)
+                != selectedSegmentsForBucket))) {
+          selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
         }
 
-        if (!hasUnmergedSegments) {
-          LOGGER.info("No unmerged segments found for mergeLevel:{} for table: 
{}, Skipping task generation: {}",
-              mergeLevel, offlineTableName, taskType);
+        if (selectedSegmentsForAllBuckets.isEmpty()) {
+          LOGGER.info("No unmerged segment found for table: {}, mergeLevel: 
{}", offlineTableName, mergeLevel);
           continue;
         }
 
-        Long prevWatermarkMs = 
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);
-        LOGGER.info("Update watermark of mergeLevel: {} for table: {} from: {} 
to: {}", mergeLevel, offlineTableName,
-            prevWatermarkMs, windowStartMs);
+        // Bump up watermark to the earliest start time of selected segments 
truncated to the closest bucket boundary
+        long newWatermarkMs = 
selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs * 
bucketMs;
+        Long prevWatermarkMs = 
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
+        LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} 
to: {}", offlineTableName, mergeLevel,
+            prevWatermarkMs, bucketStartMs);
 
         // Update the delay metrics
-        updateDelayMetrics(offlineTableName, mergeLevel, windowStartMs, 
bufferMs, bucketMs);
+        updateDelayMetrics(offlineTableName, mergeLevel, bucketStartMs, 
bufferMs, bucketMs);
 
         // Create task configs
-        int maxNumRecordsPerTask = 
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null ? Integer
-            
.parseInt(mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY))
-            : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+        int maxNumRecordsPerTask =
+            mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) != 
null ? Integer.parseInt(
+                
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY)) : 
DEFAULT_MAX_NUM_RECORDS_PER_TASK;
         SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
         if (segmentPartitionConfig == null) {
-          pinotTaskConfigsForTable.addAll(
-              createPinotTaskConfigs(selectedSegments, offlineTableName, 
maxNumRecordsPerTask, mergeLevel, mergeConfigs,
-                  taskConfigs));
+          for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
+            pinotTaskConfigsForTable.addAll(
+                createPinotTaskConfigs(selectedSegmentsPerBucket, 
offlineTableName, maxNumRecordsPerTask, mergeLevel,
+                    mergeConfigs, taskConfigs));
+          }
         } else {
           // For partitioned table, schedule separate tasks for each partition
           Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
@@ -320,30 +355,33 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
           Map.Entry<String, ColumnPartitionConfig> partitionEntry = 
columnPartitionMap.entrySet().iterator().next();
           String partitionColumn = partitionEntry.getKey();
 
-          Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
-          // Handle segments that have multiple partitions or no partition info
-          List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
-          for (SegmentZKMetadata selectedSegment : selectedSegments) {
-            SegmentPartitionMetadata segmentPartitionMetadata = 
selectedSegment.getPartitionMetadata();
-            if (segmentPartitionMetadata == null
-                || 
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
-              outlierSegments.add(selectedSegment);
-            } else {
-              int partition = 
segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
-              partitionToSegments.computeIfAbsent(partition, k -> new 
ArrayList<>()).add(selectedSegment);
+          for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
+            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new 
HashMap<>();
+            // Handle segments that have multiple partitions or no partition 
info
+            List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
+            for (SegmentZKMetadata selectedSegment : 
selectedSegmentsPerBucket) {
+              SegmentPartitionMetadata segmentPartitionMetadata = 
selectedSegment.getPartitionMetadata();
+              if (segmentPartitionMetadata == null
+                  || 
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+                outlierSegments.add(selectedSegment);
+              } else {
+                int partition = 
segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
+                partitionToSegments.computeIfAbsent(partition, k -> new 
ArrayList<>()).add(selectedSegment);
+              }
             }
-          }
 
-          for (Map.Entry<Integer, List<SegmentZKMetadata>> 
partitionToSegmentsEntry : partitionToSegments.entrySet()) {
-            pinotTaskConfigsForTable.addAll(
-                createPinotTaskConfigs(partitionToSegmentsEntry.getValue(), 
offlineTableName, maxNumRecordsPerTask,
-                    mergeLevel, mergeConfigs, taskConfigs));
-          }
+            for (Map.Entry<Integer, List<SegmentZKMetadata>> 
partitionToSegmentsEntry
+                : partitionToSegments.entrySet()) {
+              pinotTaskConfigsForTable.addAll(
+                  createPinotTaskConfigs(partitionToSegmentsEntry.getValue(), 
offlineTableName, maxNumRecordsPerTask,
+                      mergeLevel, mergeConfigs, taskConfigs));
+            }
 
-          if (!outlierSegments.isEmpty()) {
-            pinotTaskConfigsForTable.addAll(
-                createPinotTaskConfigs(outlierSegments, offlineTableName, 
maxNumRecordsPerTask, mergeLevel,
-                    mergeConfigs, taskConfigs));
+            if (!outlierSegments.isEmpty()) {
+              pinotTaskConfigsForTable.addAll(
+                  createPinotTaskConfigs(outlierSegments, offlineTableName, 
maxNumRecordsPerTask, mergeLevel,
+                      mergeConfigs, taskConfigs));
+            }
           }
         }
       }
@@ -358,9 +396,8 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
         continue;
       }
       pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
-      LOGGER
-          .info("Finished generating task configs for table: {} for task: {}, 
numTasks: {}", offlineTableName, taskType,
-              pinotTaskConfigsForTable.size());
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}, numTasks: {}", offlineTableName,
+          taskType, pinotTaskConfigsForTable.size());
     }
 
     // Clean up metrics
@@ -389,26 +426,36 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
   }
 
   /**
+   * Check if the segment span multiple buckets
+   */
+  private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long 
bucketMs) {
+    return segmentZKMetadata.getStartTimeMs() / bucketMs < 
segmentZKMetadata.getEndTimeMs() / bucketMs;
+  }
+
+  /**
    * Check if the segment is merged for give merge level
    */
   private boolean isMergedSegment(SegmentZKMetadata segmentZKMetadata, String 
mergeLevel) {
     Map<String, String> customMap = segmentZKMetadata.getCustomMap();
-    return customMap != null && mergeLevel
-        
.equalsIgnoreCase(customMap.get(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
+    return customMap != null && mergeLevel.equalsIgnoreCase(
+        customMap.get(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
   }
 
   /**
-   * Check if the merge window end time is valid
+   * Check if the bucket end time is valid
    */
-  private boolean isValidMergeWindowEndTime(long windowEndMs, long bufferMs, 
String lowerMergeLevel,
+  private boolean isValidBucketEndTime(long bucketEndMs, long bufferMs, 
@Nullable String lowerMergeLevel,
       MergeRollupTaskMetadata mergeRollupTaskMetadata) {
-    // Check that execution window endTimeMs <= now - bufferTime
-    if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+    // Check that bucketEndMs <= now - bufferMs
+    if (bucketEndMs > System.currentTimeMillis() - bufferMs) {
       return false;
     }
-    // Check that execution window endTimeMs <= waterMark of the lower 
mergeLevel
-    return lowerMergeLevel == null || 
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel) == null
-        || windowEndMs <= 
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel);
+    // Check that bucketEndMs <= waterMark of the lower mergeLevel
+    if (lowerMergeLevel != null) {
+      Long lowerMergeLevelWatermarkMs = 
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel);
+      return lowerMergeLevelWatermarkMs != null && bucketEndMs <= 
lowerMergeLevelWatermarkMs;
+    }
+    return true;
   }
 
   /**
@@ -473,14 +520,12 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
         }
       }
 
-      configs.put(MergeRollupTask.MERGE_TYPE_KEY, 
mergeConfigs.get(MinionConstants.MergeTask.MERGE_TYPE_KEY));
+      configs.put(MergeRollupTask.MERGE_TYPE_KEY, 
mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
       configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
-      configs.put(MinionConstants.MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY,
-          mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
-      configs.put(MinionConstants.MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
-          
mergeConfigs.get(MinionConstants.MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
-      configs.put(MinionConstants.MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-          
mergeConfigs.get(MinionConstants.MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
+      configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, 
mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
+      configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, 
mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
+      configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+          mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
 
       configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
           MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + 
System.currentTimeMillis() + "_" + i + "_"
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
index 357cd73..463b2e5 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
@@ -34,7 +34,8 @@ public class MergeRollupTaskUtils {
       MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
       MergeTask.MERGE_TYPE_KEY,
       MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-      MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY
+      MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+      MergeTask.MAX_NUM_PARALLEL_BUCKETS
   };
   //@formatter:on
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 78ba547..898a07b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -257,7 +257,7 @@ public class MergeRollupTaskGeneratorTest {
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
         "1d", null, "1000000");
     
assertEquals(pinotTaskConfigs.get(0).getConfigs().get(MinionConstants.DOWNLOAD_URL_KEY),
 "download1,download2");
 
@@ -270,9 +270,124 @@ public class MergeRollupTaskGeneratorTest {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
         "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat", "1d", null, "1000000");
+  }
+
+  /**
+   * Test num parallel buckets
+   */
+  @Test
+  public void testNumParallelBuckets() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put("daily.mergeType", "concat");
+    tableTaskConfigs.put("daily.bufferTimePeriod", "2d");
+    tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
+    tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
+    tableTaskConfigs.put("daily.maxNumParallelBuckets", "3");
+    taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, 
tableTaskConfigs);
+    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+
+    String segmentName1 = "testTable__1";
+    String segmentName2 = "testTable__2";
+    String segmentName3 = "testTable__3";
+    String segmentName4 = "testTable__4";
+    String segmentName5 = "testTable__5";
+    SegmentZKMetadata metadata1 =
+        getSegmentZKMetadata(segmentName1, 86_400_000L, 90_000_000L, 
TimeUnit.MILLISECONDS, "download1");
+    SegmentZKMetadata metadata2 =
+        getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L, 
TimeUnit.MILLISECONDS, "download2");
+    SegmentZKMetadata metadata3 =
+        getSegmentZKMetadata(segmentName3, 172_800_000L, 173_000_000L, 
TimeUnit.MILLISECONDS, "download3");
+    SegmentZKMetadata metadata4 =
+        getSegmentZKMetadata(segmentName4, 259_200_000L, 260_000_000L, 
TimeUnit.MILLISECONDS, "download4");
+    SegmentZKMetadata metadata5 =
+        getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L, 
TimeUnit.MILLISECONDS, "download5");
+
+    // No spilled over data
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5));
+    MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 3);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, 
DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Has spilled over data
+    String segmentName6 = "testTable__6";
+    SegmentZKMetadata metadata6 =
+        getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L, 
TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadata6));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 2);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + 
"," + segmentName6, DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Has time bucket without overlapping segments
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, 
metadata5));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 3);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5, 
DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Has un-merged buckets
+    metadata6 = getSegmentZKMetadata(segmentName6, 432_000_000L, 432_100_000L, 
TimeUnit.MILLISECONDS, null);
+    
metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
metadata4.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadata6));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 3);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName5, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName6, 
DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Test number of scheduled buckets < numParallelBuckets
+    tableTaskConfigs.put("monthly.mergeType", "concat");
+    tableTaskConfigs.put("monthly.bufferTimePeriod", "30d");
+    tableTaskConfigs.put("monthly.bucketTimePeriod", "30d");
+    tableTaskConfigs.put("monthly.maxNumRecordsPerSegment", "1000000");
+    tableTaskConfigs.put("monthly.maxNumParallelBuckets", "3");
+    TreeMap<String, Long> waterMarkMap = new TreeMap<>();
+    // Watermark for daily is at 30 days since epoch
+    waterMarkMap.put(DAILY, 2_592_000_000L);
+    
when(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).thenReturn(
+        new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, 
waterMarkMap).toZNRecord());
+
+    String segmentName7 = "testTable__7";
+    String segmentName8 = "testTable__8";
+    SegmentZKMetadata metadata7 =
+        getSegmentZKMetadata(segmentName7, 86_400_000L, 90_000_000L, 
TimeUnit.MILLISECONDS, "download7");
+    SegmentZKMetadata metadata8 =
+        getSegmentZKMetadata(segmentName8, 2_592_000_000L, 2_600_000_000L, 
TimeUnit.MILLISECONDS, "download8");
+    
metadata7.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    
metadata8.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
 DAILY));
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata7, metadata8));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7, 
MONTHLY, "concat", "30d",
+        null, "1000000");
   }
 
   /**
@@ -321,9 +436,9 @@ public class MergeRollupTaskGeneratorTest {
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
         "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + 
"," + segmentName4, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + 
"," + segmentName4, DAILY, "concat",
         "1d", null, "1000000");
 
     // With numMaxRecordsPerTask constraints
@@ -335,10 +450,10 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
         "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
"daily", "concat", "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, 
DAILY, "concat", "1d", null, "1000000");
   }
 
   /**
@@ -375,7 +490,7 @@ public class MergeRollupTaskGeneratorTest {
         
.fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
DAILY, "concat", "1d", null, "1000000");
 
     // Bump up watermark to the merge task execution window start time
     TreeMap<String, Long> waterMarkMap = new TreeMap<>();
@@ -388,7 +503,7 @@ public class MergeRollupTaskGeneratorTest {
         
.fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 345_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, 
DAILY, "concat", "1d", null, "1000000");
 
     // Not updating watermark if there's no unmerged segments
     waterMarkMap.put(DAILY, 345_600_000L);
@@ -459,7 +574,7 @@ public class MergeRollupTaskGeneratorTest {
     taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, 
DAILY, "concat", "1d", null, "1000000");
 
     // If same task and table, but COMPLETED, generate
     mergedMetadata1
@@ -474,7 +589,7 @@ public class MergeRollupTaskGeneratorTest {
     taskStatesMap.put(taskName, TaskState.COMPLETED);
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, 
DAILY, "concat", "1d", null, "1000000");
   }
 
   /**
@@ -531,7 +646,7 @@ public class MergeRollupTaskGeneratorTest {
         .get(DAILY).longValue(), 86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily1 = 
pinotTaskConfigs.get(0).getConfigs();
-    checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2 
+ "," + segmentName3, "daily", "concat",
+    checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2 
+ "," + segmentName3, DAILY, "concat",
         "1d", null, "1000000");
 
     // Monthly task is not scheduled until there are 30 days daily merged 
segments available (monthly merge window
@@ -564,7 +679,7 @@ public class MergeRollupTaskGeneratorTest {
         .get(DAILY).longValue(), 2_505_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily2 = 
pinotTaskConfigs.get(0).getConfigs();
-    checkPinotTaskConfig(taskConfigsDaily2, segmentName4, "daily", "concat", 
"1d", null, "1000000");
+    checkPinotTaskConfig(taskConfigsDaily2, segmentName4, DAILY, "concat", 
"1d", null, "1000000");
 
     // Schedule multiple tasks for both merge levels simultaneously
     String segmentNameMergedDaily2 = "merged_testTable__4_1";
@@ -601,9 +716,9 @@ public class MergeRollupTaskGeneratorTest {
     assertEquals(pinotTaskConfigs.size(), 2);
     Map<String, String> taskConfigsDaily3 = 
pinotTaskConfigs.get(0).getConfigs();
     Map<String, String> taskConfigsMonthly1 = 
pinotTaskConfigs.get(1).getConfigs();
-    checkPinotTaskConfig(taskConfigsDaily3, segmentNameMergedDaily3 + "," + 
segmentName5, "daily", "concat", "1d", null,
+    checkPinotTaskConfig(taskConfigsDaily3, segmentNameMergedDaily3 + "," + 
segmentName5, DAILY, "concat", "1d", null,
         "1000000");
-    checkPinotTaskConfig(taskConfigsMonthly1, segmentNameMergedDaily1 + "," + 
segmentNameMergedDaily2, "monthly",
+    checkPinotTaskConfig(taskConfigsMonthly1, segmentNameMergedDaily1 + "," + 
segmentNameMergedDaily2, MONTHLY,
         "rollup", "30d", "30d", "2000000");
 
     // Not scheduling for daily tasks if there are no unmerged segments
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
index d8824de..611598c 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
@@ -40,6 +40,7 @@ public class MergeRollupTaskUtilsTest {
     taskConfig.put("monthly.roundBucketTimePeriod", "7d");
     taskConfig.put("monthly.mergeType", "rollup");
     taskConfig.put("monthly.maxNumRecordsPerTask", "5000000");
+    taskConfig.put("monthly.maxNumParallelBuckets", "5");
 
     Map<String, Map<String, String>> levelToConfigMap = 
MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
     assertEquals(levelToConfigMap.size(), 2);
@@ -53,11 +54,12 @@ public class MergeRollupTaskUtilsTest {
 
     Map<String, String> monthlyConfig = levelToConfigMap.get("monthly");
     assertNotNull(monthlyConfig);
-    assertEquals(monthlyConfig.size(), 5);
+    assertEquals(monthlyConfig.size(), 6);
     assertEquals(monthlyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "30d");
     assertEquals(monthlyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "10d");
     assertEquals(monthlyConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY), 
"7d");
     assertEquals(monthlyConfig.get(MergeTask.MERGE_TYPE_KEY), "rollup");
     assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY), 
"5000000");
+    assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS), "5");
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to