noob-se7en commented on code in PR #14623: URL: https://github.com/apache/pinot/pull/14623#discussion_r1925942607
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -287,77 +585,69 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe * If the znode is null, computes the watermark using either the start time config or the start time from segment * metadata */ - private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata, - long bucketMs) { - ZNRecord realtimeToOfflineZNRecord = - _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, - realtimeTableName); - RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = - realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord( - realtimeToOfflineZNRecord) : null; - - if (realtimeToOfflineSegmentsTaskMetadata == null) { - // No ZNode exists. Cold-start. - long watermarkMs; - - // Find the smallest time from all segments - long minStartTimeMs = Long.MAX_VALUE; - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs()); - } - Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE); + private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String realtimeTableName, + List<SegmentZKMetadata> completedSegmentsZKMetadata, + long bucketMs, ZNRecord realtimeToOfflineZNRecord) { + + if (realtimeToOfflineZNRecord != null) { + return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord( + realtimeToOfflineZNRecord); + } - // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries - // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window - // [20200813, 20200814) - watermarkMs = (minStartTimeMs / bucketMs) * bucketMs; + // No ZNode exists. Cold-start. + long watermarkMs; - // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above - realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs); - _clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, - MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1); + // Find the smallest time from all segments + long minStartTimeMs = Long.MAX_VALUE; + for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { + minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs()); } - return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(); - } + Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE); - @Override - public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { - // check table is not upsert - Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, - "RealtimeToOfflineTask doesn't support upsert table!"); - // check no malformed period - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); - // check mergeType is correct - Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) - .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) - .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries + // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window + // [20200813, 20200814) + watermarkMs = (minStartTimeMs / bucketMs) * bucketMs; - Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); - // check no mis-configured columns - Set<String> columnNames = schema.getColumnNames(); + return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs); + } + + private PinotTaskConfig createPinotTaskConfig(List<String> segmentNameList, List<String> downloadURLList, + String realtimeTableName, Map<String, String> taskConfigs, TableConfig tableConfig, long windowStartMs, + long windowEndMs, String taskType) { + + Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs, + _clusterInfoAccessor); + configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList)); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR)); Review Comment: Unit tests are pending, which will validate all these things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org