This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 493ed2c MergeRollupTaskGenerator enhancement: enable parallel buckets
scheduling (#7481)
493ed2c is described below
commit 493ed2c0433fdc40f605f0b06ce5f6f98ce62725
Author: Jiapeng Tao <[email protected]>
AuthorDate: Mon Oct 11 19:27:53 2021 -0700
MergeRollupTaskGenerator enhancement: enable parallel buckets scheduling
(#7481)
---
.../apache/pinot/core/common/MinionConstants.java | 1 +
.../mergerollup/MergeRollupTaskGenerator.java | 283 ++++++++++++---------
.../tasks/mergerollup/MergeRollupTaskUtils.java | 3 +-
.../mergerollup/MergeRollupTaskGeneratorTest.java | 147 +++++++++--
.../mergerollup/MergeRollupTaskUtilsTest.java | 4 +-
5 files changed, 301 insertions(+), 137 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 06ff9d5..0ce9980 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -88,6 +88,7 @@ public class MinionConstants {
// Segment config
public static final String MAX_NUM_RECORDS_PER_TASK_KEY =
"maxNumRecordsPerTask";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY =
"maxNumRecordsPerSegment";
+ public static final String MAX_NUM_PARALLEL_BUCKETS =
"maxNumParallelBuckets";
public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index c1ab565..8e9265c 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.ZNRecord;
@@ -43,6 +44,7 @@ import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerato
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -65,36 +67,43 @@ import org.slf4j.LoggerFactory;
*
* - Pre-select segments:
* - Fetch all segments, select segments based on segment lineage (removing
segmentsFrom for COMPLETED lineage
- * entry and
- * segmentsTo for IN_PROGRESS lineage entry)
+ * entry and segmentsTo for IN_PROGRESS lineage entry)
* - Remove empty segments
* - Sort segments based on startTime and endTime in ascending order
*
* For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily ->
Monthly -> Yearly):
* - Skip scheduling if there's incomplete task for the mergeLevel
- * - Calculate merge/rollup window:
- * - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
- * found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>
- * In case of cold-start, no ZNode will exist.
- * A new ZNode will be created, with watermarkMs as the smallest time
found in all segments truncated to the
- * closest bucket start time.
- * - The execution window for the task is calculated as,
- * windowStartMs = watermarkMs, windowEndMs = windowStartMs +
bucketTimeMs
- * - Skip scheduling if the window is invalid:
- * - If the execution window is not older than bufferTimeMs, no task
will be generated
- * - The windowEndMs of higher merge level should be less or equal to
the waterMarkMs of lower level
- * - Bump up target window and watermark if needed.
- * - If there's no unmerged segments (by checking segment zk metadata
{mergeRollupTask.mergeLevel: level}) for
- * current window,
- * keep bumping up the watermark and target window until unmerged
segments are found. Else skip the scheduling.
- * - Select all segments for the target window
- * - Create tasks (per partition for partitioned table) based on
maxNumRecordsPerTask
+ * - Schedule tasks for at most k time buckets, k is up to
maxNumParallelBuckets (by default 1) at best effort
+ * - Repeat until k time buckets get created or we loop through all the
candidate segments:
+ * - Calculate merge/roll-up bucket:
+ * - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
found at
+ * {@code MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>}
+ * In case of cold-start, no ZNode will exist.
+ * A new ZNode will be created, with watermarkMs as the smallest time
found in all segments truncated to the
+ * closest bucket start time.
+ * - The execution window for the task is calculated as,
+ * bucketStartMs = watermarkMs
+ * bucketEndMs = bucketStartMs + bucketTimeMs
+ * - bucketEndMs must be equal or older than the bufferTimeMs
+ * - bucketEndMs of higher merge level should be less or equal to the
waterMarkMs of lower level
+ * - Bump up target window and watermark if needed.
+ * - If there's no unmerged segments (by checking segment zk metadata
{mergeRollupTask.mergeLevel: level}) for
+ * current window, keep bumping up the watermark and target window
until unmerged segments are found.
+ * - Else skip the scheduling.
+ * - Select segments for the bucket:
+ * - Skip buckets which all segments are merged
+ * - If there's no spilled over segments (segments spanning multiple
time buckets), schedule buckets in parallel
+ * - Else, schedule buckets till the first one that has spilled over
data (included), so the spilled over data
+ * will be merged next round
+ * - Create the tasks for the current bucket (and per partition for
partitioned tables) based on
+ * maxNumRecordsPerTask
*/
@TaskGenerator
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
+ private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
private static final String REFRESH = "REFRESH";
// This is the metric that keeps track of the task delay in the number of
time buckets. For example, if we see this
@@ -156,8 +165,7 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
// Reset the watermark time if no segment found. This covers the case
where the table is newly created or
// all segments for the existing table got deleted.
resetDelayMetrics(offlineTableName);
- LOGGER
- .info("Skip generating task: {} for table: {}, no segment is
found.", taskType, offlineTableName);
+ LOGGER.info("Skip generating task: {} for table: {}, no segment is
found.", taskType, offlineTableName);
continue;
}
@@ -184,8 +192,8 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
// Get incomplete merge levels
Set<String> inCompleteMergeLevels = new HashSet<>();
- for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils
- .getIncompleteTasks(taskType, offlineTableName,
_clusterInfoAccessor).entrySet()) {
+ for (Map.Entry<String, TaskState> entry :
TaskGeneratorUtils.getIncompleteTasks(taskType, offlineTableName,
+ _clusterInfoAccessor).entrySet()) {
for (PinotTaskConfig taskConfig :
_clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
}
@@ -212,106 +220,133 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
continue;
}
- // Get the bucket size and buffer
- long bucketMs =
-
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+ // Get the bucket size, buffer size and maximum number of parallel
buckets (by default 1)
+ String bucketPeriod =
mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY);
+ long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
if (bucketMs <= 0) {
- LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must
be larger than 0", offlineTableName,
- mergeLevel);
+ LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {})
must be larger than 0", bucketPeriod,
+ offlineTableName, mergeLevel);
+ continue;
+ }
+ String bufferPeriod =
mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
+ long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ if (bufferMs < 0) {
+ LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {})
must be larger or equal to 0",
+ bufferPeriod, offlineTableName, mergeLevel);
+ continue;
+ }
+ String maxNumParallelBucketsStr =
mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
+ int maxNumParallelBuckets = maxNumParallelBucketsStr != null ?
Integer.parseInt(maxNumParallelBucketsStr)
+ : DEFAULT_NUM_PARALLEL_BUCKETS;
+ if (maxNumParallelBuckets <= 0) {
+ LOGGER.error("Maximum number of parallel buckets: {} (table : {},
mergeLevel : {}) must be larger than 0",
+ maxNumParallelBuckets, offlineTableName, mergeLevel);
continue;
}
- long bufferMs =
-
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
// Get watermark from MergeRollupTaskMetadata ZNode
- // windowStartMs = watermarkMs, windowEndMs = windowStartMs +
bucketTimeMs
- long waterMarkMs =
+ // bucketStartMs = watermarkMs
+ // bucketEndMs = bucketStartMs + bucketMs
+ long bucketStartMs =
getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(),
bucketMs, mergeLevel, mergeRollupTaskMetadata);
- long windowStartMs = waterMarkMs;
- long windowEndMs = windowStartMs + bucketMs;
-
- if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel,
mergeRollupTaskMetadata)) {
- LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is
not a valid merge window, Skipping task "
- + "generation: {}", windowStartMs, windowEndMs, mergeLevel,
taskType);
+ long bucketEndMs = bucketStartMs + bucketMs;
+ if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel,
mergeRollupTaskMetadata)) {
+ LOGGER.info("Bucket with start: {} and end: {} (table : {},
mergeLevel : {}) cannot be merged yet",
+ bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
continue;
}
- // Find all segments overlapping with the merge window, if all
overlapping segments are merged, bump up the
- // target window
- List<SegmentZKMetadata> selectedSegments = new ArrayList<>();
+ // Find overlapping segments for each bucket, skip the buckets that
has all segments merged
+ List<List<SegmentZKMetadata>> selectedSegmentsForAllBuckets = new
ArrayList<>(maxNumParallelBuckets);
+ List<SegmentZKMetadata> selectedSegmentsForBucket = new ArrayList<>();
boolean hasUnmergedSegments = false;
- boolean isValidMergeWindow = true;
+ boolean hasSpilledOverData = false;
// The for loop terminates in following cases:
- // 1. Found unmerged segments in target merge window, need to bump up
watermark if windowStartMs > watermarkMs,
- // will schedule tasks
- // 2. All segments are merged in the merge window and we have loop
through all segments, skip scheduling
- // 3. Merge window is invalid (windowEndMs >
System.currentTimeMillis() - bufferMs || windowEndMs > waterMark of
- // the lower mergeLevel), skip scheduling
+ // 1. Found buckets with unmerged segments:
+ // For each bucket find all segments overlapping with the target
bucket, skip the bucket if all overlapping
+ // segments are merged. Schedule k (numParallelBuckets) buckets at
most, and stops at the first bucket that
+ // contains spilled over data.
+ // 2. There's no bucket with unmerged segments, skip scheduling
for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
long startTimeMs = preSelectedSegment.getStartTimeMs();
- if (startTimeMs < windowEndMs) {
+ if (startTimeMs < bucketEndMs) {
long endTimeMs = preSelectedSegment.getEndTimeMs();
- if (endTimeMs >= windowStartMs) {
- // For segments overlapping with current window, add to the
result list
- selectedSegments.add(preSelectedSegment);
+ if (endTimeMs >= bucketStartMs) {
+ // For segments overlapping with current bucket, add to the
result list
if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
hasUnmergedSegments = true;
}
+ if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
+ hasSpilledOverData = true;
+ }
+ selectedSegmentsForBucket.add(preSelectedSegment);
}
- // endTimeMs < windowStartMs
+ // endTimeMs < bucketStartMs
// Haven't find the first overlapping segment, continue to the
next segment
} else {
- // Has gone through all overlapping segments for current window
+ // Has gone through all overlapping segments for current bucket
if (hasUnmergedSegments) {
- // Found unmerged segments, schedule merge task for current
window
+ // Add the bucket if there are unmerged segments
+ selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
+ }
+
+ if (selectedSegmentsForAllBuckets.size() == maxNumParallelBuckets
|| hasSpilledOverData) {
+ // If there are enough buckets or found spilled over data,
schedule merge tasks
break;
} else {
- // No unmerged segments found, clean up selected segments and
bump up the merge window
+ // Start with a new bucket
// TODO: If there are many small merged segments, we should
merge them again
- selectedSegments.clear();
- selectedSegments.add(preSelectedSegment);
+ selectedSegmentsForBucket = new ArrayList<>();
+ hasUnmergedSegments = false;
+ bucketStartMs = (startTimeMs / bucketMs) * bucketMs;
+ bucketEndMs = bucketStartMs + bucketMs;
+ if (!isValidBucketEndTime(bucketEndMs, bufferMs,
lowerMergeLevel, mergeRollupTaskMetadata)) {
+ break;
+ }
if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
hasUnmergedSegments = true;
}
- windowStartMs = startTimeMs / bucketMs * bucketMs;
- windowEndMs = windowStartMs + bucketMs;
- if (!isValidMergeWindowEndTime(windowEndMs, bufferMs,
lowerMergeLevel, mergeRollupTaskMetadata)) {
- isValidMergeWindow = false;
- break;
+ if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
+ hasSpilledOverData = true;
}
+ selectedSegmentsForBucket.add(preSelectedSegment);
}
}
}
- if (!isValidMergeWindow) {
- LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is
not a valid merge window, Skipping task "
- + "generation: {}", windowStartMs, windowEndMs, mergeLevel,
taskType);
- continue;
+ // Add the last bucket if it contains unmerged segments and is not
added before
+ if (hasUnmergedSegments && (selectedSegmentsForAllBuckets.isEmpty() ||
(
+
selectedSegmentsForAllBuckets.get(selectedSegmentsForAllBuckets.size() - 1)
+ != selectedSegmentsForBucket))) {
+ selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
}
- if (!hasUnmergedSegments) {
- LOGGER.info("No unmerged segments found for mergeLevel:{} for table:
{}, Skipping task generation: {}",
- mergeLevel, offlineTableName, taskType);
+ if (selectedSegmentsForAllBuckets.isEmpty()) {
+ LOGGER.info("No unmerged segment found for table: {}, mergeLevel:
{}", offlineTableName, mergeLevel);
continue;
}
- Long prevWatermarkMs =
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);
- LOGGER.info("Update watermark of mergeLevel: {} for table: {} from: {}
to: {}", mergeLevel, offlineTableName,
- prevWatermarkMs, windowStartMs);
+ // Bump up watermark to the earliest start time of selected segments
truncated to the closest bucket boundary
+ long newWatermarkMs =
selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs *
bucketMs;
+ Long prevWatermarkMs =
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
+ LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {}
to: {}", offlineTableName, mergeLevel,
+ prevWatermarkMs, bucketStartMs);
// Update the delay metrics
- updateDelayMetrics(offlineTableName, mergeLevel, windowStartMs,
bufferMs, bucketMs);
+ updateDelayMetrics(offlineTableName, mergeLevel, bucketStartMs,
bufferMs, bucketMs);
// Create task configs
- int maxNumRecordsPerTask =
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null ? Integer
-
.parseInt(mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY))
- : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+ int maxNumRecordsPerTask =
+ mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) !=
null ? Integer.parseInt(
+
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY)) :
DEFAULT_MAX_NUM_RECORDS_PER_TASK;
SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig == null) {
- pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(selectedSegments, offlineTableName,
maxNumRecordsPerTask, mergeLevel, mergeConfigs,
- taskConfigs));
+ for (List<SegmentZKMetadata> selectedSegmentsPerBucket :
selectedSegmentsForAllBuckets) {
+ pinotTaskConfigsForTable.addAll(
+ createPinotTaskConfigs(selectedSegmentsPerBucket,
offlineTableName, maxNumRecordsPerTask, mergeLevel,
+ mergeConfigs, taskConfigs));
+ }
} else {
// For partitioned table, schedule separate tasks for each partition
Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
@@ -320,30 +355,33 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
Map.Entry<String, ColumnPartitionConfig> partitionEntry =
columnPartitionMap.entrySet().iterator().next();
String partitionColumn = partitionEntry.getKey();
- Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new
HashMap<>();
- // Handle segments that have multiple partitions or no partition info
- List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
- for (SegmentZKMetadata selectedSegment : selectedSegments) {
- SegmentPartitionMetadata segmentPartitionMetadata =
selectedSegment.getPartitionMetadata();
- if (segmentPartitionMetadata == null
- ||
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
- outlierSegments.add(selectedSegment);
- } else {
- int partition =
segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
- partitionToSegments.computeIfAbsent(partition, k -> new
ArrayList<>()).add(selectedSegment);
+ for (List<SegmentZKMetadata> selectedSegmentsPerBucket :
selectedSegmentsForAllBuckets) {
+ Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new
HashMap<>();
+ // Handle segments that have multiple partitions or no partition
info
+ List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
+ for (SegmentZKMetadata selectedSegment :
selectedSegmentsPerBucket) {
+ SegmentPartitionMetadata segmentPartitionMetadata =
selectedSegment.getPartitionMetadata();
+ if (segmentPartitionMetadata == null
+ ||
segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+ outlierSegments.add(selectedSegment);
+ } else {
+ int partition =
segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
+ partitionToSegments.computeIfAbsent(partition, k -> new
ArrayList<>()).add(selectedSegment);
+ }
}
- }
- for (Map.Entry<Integer, List<SegmentZKMetadata>>
partitionToSegmentsEntry : partitionToSegments.entrySet()) {
- pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(partitionToSegmentsEntry.getValue(),
offlineTableName, maxNumRecordsPerTask,
- mergeLevel, mergeConfigs, taskConfigs));
- }
+ for (Map.Entry<Integer, List<SegmentZKMetadata>>
partitionToSegmentsEntry
+ : partitionToSegments.entrySet()) {
+ pinotTaskConfigsForTable.addAll(
+ createPinotTaskConfigs(partitionToSegmentsEntry.getValue(),
offlineTableName, maxNumRecordsPerTask,
+ mergeLevel, mergeConfigs, taskConfigs));
+ }
- if (!outlierSegments.isEmpty()) {
- pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(outlierSegments, offlineTableName,
maxNumRecordsPerTask, mergeLevel,
- mergeConfigs, taskConfigs));
+ if (!outlierSegments.isEmpty()) {
+ pinotTaskConfigsForTable.addAll(
+ createPinotTaskConfigs(outlierSegments, offlineTableName,
maxNumRecordsPerTask, mergeLevel,
+ mergeConfigs, taskConfigs));
+ }
}
}
}
@@ -358,9 +396,8 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
continue;
}
pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
- LOGGER
- .info("Finished generating task configs for table: {} for task: {},
numTasks: {}", offlineTableName, taskType,
- pinotTaskConfigsForTable.size());
+ LOGGER.info("Finished generating task configs for table: {} for task:
{}, numTasks: {}", offlineTableName,
+ taskType, pinotTaskConfigsForTable.size());
}
// Clean up metrics
@@ -389,26 +426,36 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
}
/**
+ * Check if the segment span multiple buckets
+ */
+ private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long
bucketMs) {
+ return segmentZKMetadata.getStartTimeMs() / bucketMs <
segmentZKMetadata.getEndTimeMs() / bucketMs;
+ }
+
+ /**
* Check if the segment is merged for give merge level
*/
private boolean isMergedSegment(SegmentZKMetadata segmentZKMetadata, String
mergeLevel) {
Map<String, String> customMap = segmentZKMetadata.getCustomMap();
- return customMap != null && mergeLevel
-
.equalsIgnoreCase(customMap.get(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
+ return customMap != null && mergeLevel.equalsIgnoreCase(
+ customMap.get(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
}
/**
- * Check if the merge window end time is valid
+ * Check if the bucket end time is valid
*/
- private boolean isValidMergeWindowEndTime(long windowEndMs, long bufferMs,
String lowerMergeLevel,
+ private boolean isValidBucketEndTime(long bucketEndMs, long bufferMs,
@Nullable String lowerMergeLevel,
MergeRollupTaskMetadata mergeRollupTaskMetadata) {
- // Check that execution window endTimeMs <= now - bufferTime
- if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+ // Check that bucketEndMs <= now - bufferMs
+ if (bucketEndMs > System.currentTimeMillis() - bufferMs) {
return false;
}
- // Check that execution window endTimeMs <= waterMark of the lower
mergeLevel
- return lowerMergeLevel == null ||
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel) == null
- || windowEndMs <=
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel);
+ // Check that bucketEndMs <= waterMark of the lower mergeLevel
+ if (lowerMergeLevel != null) {
+ Long lowerMergeLevelWatermarkMs =
mergeRollupTaskMetadata.getWatermarkMap().get(lowerMergeLevel);
+ return lowerMergeLevelWatermarkMs != null && bucketEndMs <=
lowerMergeLevelWatermarkMs;
+ }
+ return true;
}
/**
@@ -473,14 +520,12 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
}
}
- configs.put(MergeRollupTask.MERGE_TYPE_KEY,
mergeConfigs.get(MinionConstants.MergeTask.MERGE_TYPE_KEY));
+ configs.put(MergeRollupTask.MERGE_TYPE_KEY,
mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
- configs.put(MinionConstants.MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY,
- mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
- configs.put(MinionConstants.MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
-
mergeConfigs.get(MinionConstants.MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
- configs.put(MinionConstants.MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-
mergeConfigs.get(MinionConstants.MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
+ configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY,
mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
+ configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
+ configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+ mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" +
System.currentTimeMillis() + "_" + i + "_"
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
index 357cd73..463b2e5 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
@@ -34,7 +34,8 @@ public class MergeRollupTaskUtils {
MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
MergeTask.MERGE_TYPE_KEY,
MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
- MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY
+ MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+ MergeTask.MAX_NUM_PARALLEL_BUCKETS
};
//@formatter:on
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 78ba547..898a07b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -257,7 +257,7 @@ public class MergeRollupTaskGeneratorTest {
generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, "daily", "concat",
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
"1d", null, "1000000");
assertEquals(pinotTaskConfigs.get(0).getConfigs().get(MinionConstants.DOWNLOAD_URL_KEY),
"download1,download2");
@@ -270,9 +270,124 @@ public class MergeRollupTaskGeneratorTest {
.thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 2);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, "daily", "concat",
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
"1d", null, "1000000");
- checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3,
DAILY, "concat", "1d", null, "1000000");
+ }
+
+ /**
+ * Test num parallel buckets
+ */
+ @Test
+ public void testNumParallelBuckets() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+ tableTaskConfigs.put("daily.mergeType", "concat");
+ tableTaskConfigs.put("daily.bufferTimePeriod", "2d");
+ tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
+ tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
+ tableTaskConfigs.put("daily.maxNumParallelBuckets", "3");
+ taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE,
tableTaskConfigs);
+ TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
+ String segmentName1 = "testTable__1";
+ String segmentName2 = "testTable__2";
+ String segmentName3 = "testTable__3";
+ String segmentName4 = "testTable__4";
+ String segmentName5 = "testTable__5";
+ SegmentZKMetadata metadata1 =
+ getSegmentZKMetadata(segmentName1, 86_400_000L, 90_000_000L,
TimeUnit.MILLISECONDS, "download1");
+ SegmentZKMetadata metadata2 =
+ getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L,
TimeUnit.MILLISECONDS, "download2");
+ SegmentZKMetadata metadata3 =
+ getSegmentZKMetadata(segmentName3, 172_800_000L, 173_000_000L,
TimeUnit.MILLISECONDS, "download3");
+ SegmentZKMetadata metadata4 =
+ getSegmentZKMetadata(segmentName4, 259_200_000L, 260_000_000L,
TimeUnit.MILLISECONDS, "download4");
+ SegmentZKMetadata metadata5 =
+ getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L,
TimeUnit.MILLISECONDS, "download5");
+
+ // No spilled over data
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3,
metadata4, metadata5));
+ MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+ generator.init(mockClusterInfoProvide);
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 3);
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3,
DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4,
DAILY, "concat",
+ "1d", null, "1000000");
+
+ // Has spilled over data
+ String segmentName6 = "testTable__6";
+ SegmentZKMetadata metadata6 =
+ getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L,
TimeUnit.MILLISECONDS, null);
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3,
metadata4, metadata5, metadata6));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 2);
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 +
"," + segmentName6, DAILY, "concat",
+ "1d", null, "1000000");
+
+ // Has time bucket without overlapping segments
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4,
metadata5));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 3);
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4,
DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5,
DAILY, "concat",
+ "1d", null, "1000000");
+
+ // Has un-merged buckets
+ metadata6 = getSegmentZKMetadata(segmentName6, 432_000_000L, 432_100_000L,
TimeUnit.MILLISECONDS, null);
+
metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
DAILY));
+
metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
DAILY));
+
metadata4.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
DAILY));
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3,
metadata4, metadata5, metadata6));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 3);
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3,
DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName5,
DAILY, "concat",
+ "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName6,
DAILY, "concat",
+ "1d", null, "1000000");
+
+ // Test number of scheduled buckets < numParallelBuckets
+ tableTaskConfigs.put("monthly.mergeType", "concat");
+ tableTaskConfigs.put("monthly.bufferTimePeriod", "30d");
+ tableTaskConfigs.put("monthly.bucketTimePeriod", "30d");
+ tableTaskConfigs.put("monthly.maxNumRecordsPerSegment", "1000000");
+ tableTaskConfigs.put("monthly.maxNumParallelBuckets", "3");
+ TreeMap<String, Long> waterMarkMap = new TreeMap<>();
+ // Watermark for daily is at 30 days since epoch
+ waterMarkMap.put(DAILY, 2_592_000_000L);
+
when(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).thenReturn(
+ new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME,
waterMarkMap).toZNRecord());
+
+ String segmentName7 = "testTable__7";
+ String segmentName8 = "testTable__8";
+ SegmentZKMetadata metadata7 =
+ getSegmentZKMetadata(segmentName7, 86_400_000L, 90_000_000L,
TimeUnit.MILLISECONDS, "download7");
+ SegmentZKMetadata metadata8 =
+ getSegmentZKMetadata(segmentName8, 2_592_000_000L, 2_600_000_000L,
TimeUnit.MILLISECONDS, "download8");
+
metadata7.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
DAILY));
+
metadata8.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
DAILY));
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata7, metadata8));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7,
MONTHLY, "concat", "30d",
+ null, "1000000");
}
/**
@@ -321,9 +436,9 @@ public class MergeRollupTaskGeneratorTest {
generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 2);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, "daily", "concat",
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
"1d", null, "1000000");
- checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 +
"," + segmentName4, "daily", "concat",
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 +
"," + segmentName4, DAILY, "concat",
"1d", null, "1000000");
// With numMaxRecordsPerTask constraints
@@ -335,10 +450,10 @@ public class MergeRollupTaskGeneratorTest {
pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 3);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, "daily", "concat",
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 +
"," + segmentName2, DAILY, "concat",
"1d", null, "1000000");
- checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3,
"daily", "concat", "1d", null, "1000000");
- checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3,
DAILY, "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4,
DAILY, "concat", "1d", null, "1000000");
}
/**
@@ -375,7 +490,7 @@ public class MergeRollupTaskGeneratorTest {
.fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
.get(DAILY).longValue(), 86_400_000L);
assertEquals(pinotTaskConfigs.size(), 1);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1,
DAILY, "concat", "1d", null, "1000000");
// Bump up watermark to the merge task execution window start time
TreeMap<String, Long> waterMarkMap = new TreeMap<>();
@@ -388,7 +503,7 @@ public class MergeRollupTaskGeneratorTest {
.fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
.get(DAILY).longValue(), 345_600_000L);
assertEquals(pinotTaskConfigs.size(), 1);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2,
DAILY, "concat", "1d", null, "1000000");
// Not updating watermark if there's no unmerged segments
waterMarkMap.put(DAILY, 345_600_000L);
@@ -459,7 +574,7 @@ public class MergeRollupTaskGeneratorTest {
taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1,
DAILY, "concat", "1d", null, "1000000");
// If same task and table, but COMPLETED, generate
mergedMetadata1
@@ -474,7 +589,7 @@ public class MergeRollupTaskGeneratorTest {
taskStatesMap.put(taskName, TaskState.COMPLETED);
pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
- checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2,
"daily", "concat", "1d", null, "1000000");
+ checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2,
DAILY, "concat", "1d", null, "1000000");
}
/**
@@ -531,7 +646,7 @@ public class MergeRollupTaskGeneratorTest {
.get(DAILY).longValue(), 86_400_000L);
assertEquals(pinotTaskConfigs.size(), 1);
Map<String, String> taskConfigsDaily1 =
pinotTaskConfigs.get(0).getConfigs();
- checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2
+ "," + segmentName3, "daily", "concat",
+ checkPinotTaskConfig(taskConfigsDaily1, segmentName1 + "," + segmentName2
+ "," + segmentName3, DAILY, "concat",
"1d", null, "1000000");
// Monthly task is not scheduled until there are 30 days daily merged
segments available (monthly merge window
@@ -564,7 +679,7 @@ public class MergeRollupTaskGeneratorTest {
.get(DAILY).longValue(), 2_505_600_000L);
assertEquals(pinotTaskConfigs.size(), 1);
Map<String, String> taskConfigsDaily2 =
pinotTaskConfigs.get(0).getConfigs();
- checkPinotTaskConfig(taskConfigsDaily2, segmentName4, "daily", "concat",
"1d", null, "1000000");
+ checkPinotTaskConfig(taskConfigsDaily2, segmentName4, DAILY, "concat",
"1d", null, "1000000");
// Schedule multiple tasks for both merge levels simultaneously
String segmentNameMergedDaily2 = "merged_testTable__4_1";
@@ -601,9 +716,9 @@ public class MergeRollupTaskGeneratorTest {
assertEquals(pinotTaskConfigs.size(), 2);
Map<String, String> taskConfigsDaily3 =
pinotTaskConfigs.get(0).getConfigs();
Map<String, String> taskConfigsMonthly1 =
pinotTaskConfigs.get(1).getConfigs();
- checkPinotTaskConfig(taskConfigsDaily3, segmentNameMergedDaily3 + "," +
segmentName5, "daily", "concat", "1d", null,
+ checkPinotTaskConfig(taskConfigsDaily3, segmentNameMergedDaily3 + "," +
segmentName5, DAILY, "concat", "1d", null,
"1000000");
- checkPinotTaskConfig(taskConfigsMonthly1, segmentNameMergedDaily1 + "," +
segmentNameMergedDaily2, "monthly",
+ checkPinotTaskConfig(taskConfigsMonthly1, segmentNameMergedDaily1 + "," +
segmentNameMergedDaily2, MONTHLY,
"rollup", "30d", "30d", "2000000");
// Not scheduling for daily tasks if there are no unmerged segments
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
index d8824de..611598c 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
@@ -40,6 +40,7 @@ public class MergeRollupTaskUtilsTest {
taskConfig.put("monthly.roundBucketTimePeriod", "7d");
taskConfig.put("monthly.mergeType", "rollup");
taskConfig.put("monthly.maxNumRecordsPerTask", "5000000");
+ taskConfig.put("monthly.maxNumParallelBuckets", "5");
Map<String, Map<String, String>> levelToConfigMap =
MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
assertEquals(levelToConfigMap.size(), 2);
@@ -53,11 +54,12 @@ public class MergeRollupTaskUtilsTest {
Map<String, String> monthlyConfig = levelToConfigMap.get("monthly");
assertNotNull(monthlyConfig);
- assertEquals(monthlyConfig.size(), 5);
+ assertEquals(monthlyConfig.size(), 6);
assertEquals(monthlyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "30d");
assertEquals(monthlyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "10d");
assertEquals(monthlyConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY),
"7d");
assertEquals(monthlyConfig.get(MergeTask.MERGE_TYPE_KEY), "rollup");
assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY),
"5000000");
+ assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS), "5");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]