snleee commented on a change in pull request #7827:
URL: https://github.com/apache/pinot/pull/7827#discussion_r758668575
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -439,6 +439,18 @@ private boolean validate(TableConfig tableConfig, String
taskType) {
return true;
}
+ /**
+ * Get the max valid merge bucket end time for the segment, LONG.MIN_VALUE
if the segment is not ready to merge
+ */
+ private long getMaxValidBucketEndTimeMsForSegment(SegmentZKMetadata
segmentZKMetadata, long bucketMs, long bufferMs) {
+ if (segmentZKMetadata.getStartTimeMs() / bucketMs * bucketMs + bucketMs >
System.currentTimeMillis() - bufferMs) {
Review comment:
I would separate out this into 2 lines for better readability.
```
e.g.
long bucketEndTimeForSegment = segmentZKMetadata.getStartTimeMs() / bucketMs
* bucketMs + bucketMs
if (bucketEndTimeForSegment > System.currentTimeMillis() - bufferMs()) {
return Long.MIN_VALUE
}
````
By the way, why do we need to use `startTimeMs` here?
Things that we need to brainstorm: do we have a case where
`(segmentZKMetadata.getEndTimeMs() / bucketMs + 1)` is different from
`segmentZKMetadata.getStartTimeMs() / bucketMs * bucketMs + bucketMs`?
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -439,6 +437,23 @@ private boolean validate(TableConfig tableConfig, String
taskType) {
return true;
}
+ /**
+ * Get the max valid merge bucket end time for the segment, LONG.MIN_VALUE
if the segment is not ready to merge
+ */
+ private long getMaxValidBucketEndTimeMsForSegment(SegmentZKMetadata
segmentZKMetadata, long bucketMs, long bufferMs) {
+ // Make sure the segment is ready to merge (the first bucket <= now -
bufferTime)
+ if (segmentZKMetadata.getStartTimeMs() / bucketMs * bucketMs + bucketMs >
System.currentTimeMillis() - bufferMs) {
+ return Long.MIN_VALUE;
+ }
+ // The max bucketEndTime is calculated as the min(segment end time, now -
bufferTime) rounded to the bucket
+ // boundary. Notice bucketEndTime is exclusive while segment end time is
inclusive.
+ // E.g. if bucketTime = 1d, the rounded segment end time of [10/1 00:00,
10/1 23:59] is 10/2 00:00
+ // The rounded segment end time of [10/1 00:00, 10/2 00:00] is 10/3 00:00
+ long maxBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs +
1) * bucketMs;
+ maxBucketEndTimeMs = Math.min(maxBucketEndTimeMs,
(System.currentTimeMillis() - bufferMs) / bucketMs * bucketMs);
Review comment:
Please store the value for `System.currentTimeMillis()` once and reuse
the same value throughout the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]