snleee commented on code in PR #9890:
URL: https://github.com/apache/pinot/pull/9890#discussion_r1043864051
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -430,22 +448,70 @@ public List<PinotTaskConfig>
generateTasks(List<TableConfig> tableConfigs) {
return pinotTaskConfigs;
}
+ @VisibleForTesting
+ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType
tableType, List<SegmentZKMetadata> allSegments) {
+ if (tableType == TableType.REALTIME) {
+ // For realtime table, don't process
+ // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
+ // 2. sealed segments with start time later than the earliest start time
of all in progress segments
+ // This prevents those in-progress segments from not being merged.
+ //
+ // Note that we make the following two assumptions here:
+ // 1. streaming data consumer lags are negligible
+ // 2. streaming data records are ingested mostly in chronological order
(no records are ingested with delay larger
+ // than bufferTimeMS)
+ //
+ // We don't handle the following cases intentionally because it will be
either overkill or too complex
+ // 1. New partition added. If new partitions are not picked up timely,
the MergeRollupTask will move watermarks
+ // forward, and may not be able to merge some lately-created segments
for those new partitions -- users should
+ // configure pinot properly to discover new partitions timely, or
they should restart pinot servers manually
+ // for new partitions to be picked up
+ // 2. (1) no new in-progress segments are created for some partitions
(2) new in-progress segments are created for
+ // partitions, but there is no record consumed (i.e, empty
in-progress segments). In those two cases,
+ // if new records are consumed later, the MergeRollupTask may have
already moved watermarks forward, and may
+ // not be able to merge those lately-created segments -- we assume
that users will have a way to backfill those
+ // records correctly.
+ long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+ for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+ if (!segmentZKMetadata.getStatus().isCompleted()
+ && segmentZKMetadata.getTotalDocs() > 0
+ && segmentZKMetadata.getStartTimeMs() <
earliestStartTimeMsOfInProgressSegments) {
+ earliestStartTimeMsOfInProgressSegments =
segmentZKMetadata.getStartTimeMs();
+ }
+ }
+ final long finalEarliestStartTimeMsOfInProgressSegments =
earliestStartTimeMsOfInProgressSegments;
+ return allSegments.stream()
+ .filter(segmentZKMetadata ->
segmentZKMetadata.getStatus().isCompleted()
+ && segmentZKMetadata.getStartTimeMs() <
finalEarliestStartTimeMsOfInProgressSegments)
+ .collect(Collectors.toList());
+ } else {
+ return allSegments;
+ }
+ }
+
/**
* Validate table config for merge/rollup task
*/
- private boolean validate(TableConfig tableConfig, String taskType) {
- String offlineTableName = tableConfig.getTableName();
- if (tableConfig.getTableType() != TableType.OFFLINE) {
- LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {},
REALTIME table is not supported yet", taskType,
- offlineTableName);
- return false;
- }
-
+ @VisibleForTesting
+ static boolean validate(TableConfig tableConfig, String taskType) {
+ String tableNameWithType = tableConfig.getTableName();
if
(REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)))
{
LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH
table is not supported", taskType,
- offlineTableName);
+ tableNameWithType);
return false;
}
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ if (tableConfig.isUpsertEnabled()) {
Review Comment:
+1 Thanks for adding this
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -65,13 +65,14 @@
/**
- * Integration test for minion task of type "MergeRollupTask"
+ * Integration test for minion task of type "MergeRollupTask" configured on
offline tables
Review Comment:
Update the comment to remove `offline tables` since we combined the test :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]