noob-se7en commented on code in PR #14623: URL: https://github.com/apache/pinot/pull/14623#discussion_r1925937906
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear(); + // windowEndTime of prev minion task needs to be re-used for picking up the + // next windowStartTime. This is useful for case where user changes minion config + // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs + windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); } + long windowEndMs = windowStartMs + bucketMs; + // since window changed, pick new segments. + segmentsToBeScheduled = + generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs, + bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata); + } - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", segmentName, taskType); - skipGenerate = true; - break; - } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); + divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL, + maxNumRecordsPerTask); + + if (segmentNamesGroupList.isEmpty()) { + continue; + } + + List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>(); + long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); + long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); + + LOGGER.info( + "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType, + windowStartMs, + newWindowEndTime, realtimeTableName); + + for (List<String> segmentNameList : segmentNamesGroupList) { + List<String> downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL); + Preconditions.checkState(segmentNameList.size() == downloadURLList.size()); + pinotTaskConfigsForTable.add( + createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig, + newWindowStartTime, + newWindowEndTime, taskType)); + } + try { + _clusterInfoAccessor + .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + } catch (ZkBadVersionException e) { + LOGGER.error( + "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are " + + "multiple task schedulers for the same table, need to investigate!", realtimeTableName); + // skip this table for this minion run + continue; + } + + pinotTaskConfigs.addAll(pinotTaskConfigsForTable); + + LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType); + } + return pinotTaskConfigs; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); } - if (skipGenerate || !segmentNames.isEmpty()) { - break; - } + } + } + } - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", - taskType, windowStartMs, windowEndMs); - windowStartMs = windowEndMs; - windowEndMs += bucketMs; + private List<String> getDownloadURLList(List<String> segmentNameList, Map<String, String> segmentNameVsDownloadURL) { + List<String> downloadURLList = new ArrayList<>(); + for (String segmentName : segmentNameList) { + downloadURLList.add(segmentNameVsDownloadURL.get(segmentName)); + } + return downloadURLList; + } + + private void deleteInvalidOfflineSegments(String offlineTableName, + Set<String> realtimeSegmentsToBeReProcessed, + Set<String> existingOfflineTableSegmentNames, + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) { + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + + Set<String> segmentsToBeDeleted = new HashSet<>(); + + for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) { + String id = segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName); + Preconditions.checkNotNull(id); + ExpectedSubtaskResult expectedSubtaskResult = + expectedSubtaskResultMap.get(id); + // if already marked as failure, no need to delete again. + if (expectedSubtaskResult.isTaskFailure()) { + continue; } + List<String> expectedCorrespondingOfflineSegments = expectedSubtaskResult.getSegmentsTo(); + segmentsToBeDeleted.addAll( + getSegmentsToDelete(expectedCorrespondingOfflineSegments, existingOfflineTableSegmentNames)); + // The expectedRealtimeToOfflineTaskResult is confirmed to be + // related to a failed task. Mark it as a failure, since executor will + // then only replace expectedRealtimeToOfflineTaskResult for the + // segments to be reprocessed. + expectedSubtaskResult.setTaskFailure(); + } + + if (!segmentsToBeDeleted.isEmpty()) { + _clusterInfoAccessor.getPinotHelixResourceManager() + .deleteSegments(offlineTableName, new ArrayList<>(segmentsToBeDeleted)); + } + } + + private Set<String> getFailedTaskSegments( + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata, + Set<String> existingOfflineTableSegmentNames) { + Set<String> failedIds = new HashSet<>(); + + // Get all the ExpectedRealtimeToOfflineTaskResult of prev minion task + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + Collection<ExpectedSubtaskResult> expectedSubtaskResultList = + expectedSubtaskResultMap.values(); + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Set<String> expectedSubtaskResultIds = + new HashSet<>(segmentNameToExpectedSubtaskResultID.values()); + + Set<String> segmentNamesToReprocess = new HashSet<>(); - if (skipGenerate) { + // Check what all offline segments are present currently + for (ExpectedSubtaskResult expectedSubtaskResult + : expectedSubtaskResultList) { + + if (expectedSubtaskResult.isTaskFailure()) { + // if task is failure and is referenced by any segment, only then add to failed task. + if (expectedSubtaskResultIds.contains(expectedSubtaskResult.getId())) { + failedIds.add(expectedSubtaskResult.getId()); + } continue; } - Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs, - _clusterInfoAccessor); - configs.putAll(getBaseTaskConfigs(tableConfig, segmentNames)); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR)); - configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - - // Segment processor configs - configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs)); - configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs)); - String roundBucketTimePeriod = taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY); - if (roundBucketTimePeriod != null) { - configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, roundBucketTimePeriod); + // get offline segments + List<String> segmentTo = expectedSubtaskResult.getSegmentsTo(); + + // If not all corresponding offline segments to a realtime segment exists, + // it means there was an issue with prev minion task. And segment needs + // to be re-processed. + boolean taskSuccessful = checkIfAllSegmentsExists(segmentTo, existingOfflineTableSegmentNames); Review Comment: I think this will answer above https://github.com/apache/pinot/pull/14623/files#r1925930071 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org