This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 26a83ba4f3 [bugfix] fix mergeRollupTask metrics (#9864)
26a83ba4f3 is described below
commit 26a83ba4f302b8b402d38d437f635ff1a4903767
Author: Haitao Zhang <[email protected]>
AuthorDate: Mon Nov 28 21:32:45 2022 -0800
[bugfix] fix mergeRollupTask metrics (#9864)
* [bugfix] fix mergeRollupTask metrics
* fix a typo
* add comments
* fix comments
---
.../mergerollup/MergeRollupTaskGenerator.java | 43 ++++++++++++----------
1 file changed, 24 insertions(+), 19 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 28043c2f60..fc7be90657 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -115,8 +115,8 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
// tableNameWithType -> mergeLevel -> watermarkMs
private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new
HashMap<>();
- // tableNameWithType -> maxValidBucketEndTime
- private final Map<String, Long> _tableMaxValidBucketEndTimeMs = new
HashMap<>();
+ // tableNameWithType -> lowestLevelMaxValidBucketEndTime
+ private final Map<String, Long> _tableLowestLevelMaxValidBucketEndTimeMs =
new HashMap<>();
@Override
public String getTaskType() {
@@ -244,16 +244,20 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(),
bucketMs, mergeLevel, mergeRollupTaskMetadata);
long bucketStartMs = watermarkMs;
long bucketEndMs = bucketStartMs + bucketMs;
+ if (lowerMergeLevel == null) {
+ long lowestLevelMaxValidBucketEndTimeMs = Long.MIN_VALUE;
+ for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
+ // Compute lowestLevelMaxValidBucketEndTimeMs among segments that
are ready for merge
+ long currentValidBucketEndTimeMs =
+ getValidBucketEndTimeMsForSegment(preSelectedSegment,
bucketMs, bufferMs);
+ lowestLevelMaxValidBucketEndTimeMs =
+ Math.max(lowestLevelMaxValidBucketEndTimeMs,
currentValidBucketEndTimeMs);
+ }
+ _tableLowestLevelMaxValidBucketEndTimeMs.put(offlineTableName,
lowestLevelMaxValidBucketEndTimeMs);
+ }
// Create delay metrics even if there's no task scheduled, this helps
the case that the controller is restarted
// but the metrics are not available until the controller schedules a
valid task
- long maxValidBucketEndTimeMs = Long.MIN_VALUE;
- for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
- // Compute maxValidBucketEndTimeMs among segments that are ready for
merge
- long currentValidBucketEndTimeMs =
getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
- maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs,
currentValidBucketEndTimeMs);
- }
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null,
watermarkMs, maxValidBucketEndTimeMs,
- bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null,
watermarkMs, bufferMs, bucketMs);
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel,
mergeRollupTaskMetadata)) {
LOGGER.info("Bucket with start: {} and end: {} (table : {},
mergeLevel : {}) cannot be merged yet",
bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
@@ -271,6 +275,9 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
// For each bucket find all segments overlapping with the target
bucket, skip the bucket if all overlapping
// segments are merged. Schedule k (numParallelBuckets) buckets at
most, and stops at the first bucket that
// contains spilled over data.
+ // One may wonder how a segment with records spanning different
buckets is handled. The short answer is that
+ // it will be cut into multiple segments, each for a separate
bucket. This is achieved by setting bucket time
+ // period as PARTITION_BUCKET_TIME_PERIOD when generating
PinotTaskConfigs
// 2. There's no bucket with unmerged segments, skip scheduling
for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
long startTimeMs = preSelectedSegment.getStartTimeMs();
@@ -338,8 +345,7 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
watermarkMs, newWatermarkMs);
// Update the delay metrics
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel,
lowerMergeLevel, newWatermarkMs,
- maxValidBucketEndTimeMs, bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel,
lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
// Create task configs
int maxNumRecordsPerTask =
@@ -459,7 +465,7 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
// the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00.
The rounded segment end time of
// [10/1 00:00, 10/2 00:00] is 10/3 00:00
long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs +
1) * bucketMs;
- validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs -
bucketMs) / bucketMs * bucketMs);
+ validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs -
bufferMs) / bucketMs * bucketMs);
return validBucketEndTimeMs;
}
@@ -603,12 +609,11 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
* @param mergeLevel merge level
* @param lowerMergeLevel lower merge level
* @param watermarkMs current watermark value
- * @param maxValidBucketEndTimeMs max valid bucket end time of all the
segments for the table
* @param bufferTimeMs buffer time
* @param bucketTimeMs bucket time
*/
private void createOrUpdateDelayMetrics(String tableNameWithType, String
mergeLevel, String lowerMergeLevel,
- long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long
bucketTimeMs) {
+ long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
if (controllerMetrics == null) {
return;
@@ -617,19 +622,19 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
// Update gauge value that indicates the delay in terms of the number of
time buckets.
Map<String, Long> watermarkForTable =
_mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new
ConcurrentHashMap<>());
- _tableMaxValidBucketEndTimeMs.put(tableNameWithType,
maxValidBucketEndTimeMs);
watermarkForTable.compute(mergeLevel, (k, v) -> {
if (v == null) {
LOGGER.info(
"Creating the gauge metric for tracking the merge/roll-up task
delay for table: {} and mergeLevel: {}."
+ "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={},
taskDelayInNumTimeBuckets={})", tableNameWithType,
- mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+ mergeLevel, watermarkMs, bufferTimeMs, bucketTimeMs,
getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs,
lowerMergeLevel == null
- ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) :
watermarkForTable.get(lowerMergeLevel),
+ ?
_tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType)
+ : watermarkForTable.get(lowerMergeLevel),
bufferTimeMs, bucketTimeMs));
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
mergeLevel),
(() ->
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
- lowerMergeLevel == null ?
_tableMaxValidBucketEndTimeMs.get(tableNameWithType)
+ lowerMergeLevel == null ?
_tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType)
: watermarkForTable.get(lowerMergeLevel), bufferTimeMs,
bucketTimeMs)));
}
return watermarkMs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]