noob-se7en commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1925942607


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -287,77 +585,69 @@ private void getCompletedSegmentsInfo(String 
realtimeTableName, List<SegmentZKMe
    * If the znode is null, computes the watermark using either the start time 
config or the start time from segment
    * metadata
    */
-  private long getWatermarkMs(String realtimeTableName, 
List<SegmentZKMetadata> completedSegmentsZKMetadata,
-      long bucketMs) {
-    ZNRecord realtimeToOfflineZNRecord =
-        
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
-            realtimeTableName);
-    RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
-        realtimeToOfflineZNRecord != null ? 
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
-            realtimeToOfflineZNRecord) : null;
-
-    if (realtimeToOfflineSegmentsTaskMetadata == null) {
-      // No ZNode exists. Cold-start.
-      long watermarkMs;
-
-      // Find the smallest time from all segments
-      long minStartTimeMs = Long.MAX_VALUE;
-      for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
-        minStartTimeMs = Math.min(minStartTimeMs, 
segmentZKMetadata.getStartTimeMs());
-      }
-      Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
+  private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String 
realtimeTableName,
+      List<SegmentZKMetadata> completedSegmentsZKMetadata,
+      long bucketMs, ZNRecord realtimeToOfflineZNRecord) {
+
+    if (realtimeToOfflineZNRecord != null) {
+      return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+          realtimeToOfflineZNRecord);
+    }
 
-      // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
-      // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window
-      // [20200813, 20200814)
-      watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
+    // No ZNode exists. Cold-start.
+    long watermarkMs;
 
-      // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark 
calculated above
-      realtimeToOfflineSegmentsTaskMetadata = new 
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
-      
_clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
-          MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1);
+    // Find the smallest time from all segments
+    long minStartTimeMs = Long.MAX_VALUE;
+    for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+      minStartTimeMs = Math.min(minStartTimeMs, 
segmentZKMetadata.getStartTimeMs());
     }
-    return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
-  }
+    Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
 
-  @Override
-  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
-    // check table is not upsert
-    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
-        "RealtimeToOfflineTask doesn't support upsert table!");
-    // check no malformed period
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
-    // check mergeType is correct
-    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
-        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
-            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+    // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
+    // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window
+    // [20200813, 20200814)
+    watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
 
-    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
-    // check no mis-configured columns
-    Set<String> columnNames = schema.getColumnNames();
+    return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
watermarkMs);
+  }
+
+  private PinotTaskConfig createPinotTaskConfig(List<String> segmentNameList, 
List<String> downloadURLList,
+      String realtimeTableName, Map<String, String> taskConfigs, TableConfig 
tableConfig, long windowStartMs,
+      long windowEndMs, String taskType) {
+
+    Map<String, String> configs = 
MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
+        _clusterInfoAccessor);
+    configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList));
+    configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR));

Review Comment:
   Unit tests are pending, which will validate all these things



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to