noob-se7en commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1925937906


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+          
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+          // windowEndTime of prev minion task needs to be re-used for picking 
up the
+          // next windowStartTime. This is useful for case where user changes 
minion config
+          // after a minion task run was complete. So windowStartTime cannot 
be watermark + bucketMs
+          windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
         }
+        long windowEndMs = windowStartMs + bucketMs;
+        // since window changed, pick new segments.
+        segmentsToBeScheduled =
+            generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, 
windowStartMs, windowEndMs, bucketMs,
+                bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, 
realtimeToOfflineSegmentsTaskMetadata);
+      }
 
-        for (SegmentZKMetadata segmentZKMetadata : 
completedSegmentsZKMetadata) {
-          String segmentName = segmentZKMetadata.getSegmentName();
-          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-          // Check overlap with window
-          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
-            // If last completed segment is being used, make sure that segment 
crosses over end of window.
-            // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
-            // would be skipped forever.
-            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
-              LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
-                  + "generation: {}", segmentName, taskType);
-              skipGenerate = true;
-              break;
-            }
-            segmentNames.add(segmentName);
-            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+      divideSegmentsAmongSubtasks(segmentsToBeScheduled, 
segmentNamesGroupList, segmentNameVsDownloadURL,
+          maxNumRecordsPerTask);
+
+      if (segmentNamesGroupList.isEmpty()) {
+        continue;
+      }
+
+      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+      long newWindowStartTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+      long newWindowEndTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+      LOGGER.info(
+          "generating tasks for: {} with window start time: {}, window end 
time: {}, table: {}", taskType,
+          windowStartMs,
+          newWindowEndTime, realtimeTableName);
+
+      for (List<String> segmentNameList : segmentNamesGroupList) {
+        List<String> downloadURLList = getDownloadURLList(segmentNameList, 
segmentNameVsDownloadURL);
+        Preconditions.checkState(segmentNameList.size() == 
downloadURLList.size());
+        pinotTaskConfigsForTable.add(
+            createPinotTaskConfig(segmentNameList, downloadURLList, 
realtimeTableName, taskConfigs, tableConfig,
+                newWindowStartTime,
+                newWindowEndTime, taskType));
+      }
+      try {
+        _clusterInfoAccessor
+            .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+                MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+                expectedVersion);
+      } catch (ZkBadVersionException e) {
+        LOGGER.error(
+            "Version changed while updating RTO task metadata for table: {}, 
skip scheduling. There are "
+                + "multiple task schedulers for the same table, need to 
investigate!", realtimeTableName);
+        // skip this table for this minion run
+        continue;
+      }
+
+      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
           }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
         }
-        if (skipGenerate || !segmentNames.isEmpty()) {
-          break;
-        }
+      }
+    }
+  }
 
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}), moving to the next time bucket",
-            taskType, windowStartMs, windowEndMs);
-        windowStartMs = windowEndMs;
-        windowEndMs += bucketMs;
+  private List<String> getDownloadURLList(List<String> segmentNameList, 
Map<String, String> segmentNameVsDownloadURL) {
+    List<String> downloadURLList = new ArrayList<>();
+    for (String segmentName : segmentNameList) {
+      downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+    }
+    return downloadURLList;
+  }
+
+  private void deleteInvalidOfflineSegments(String offlineTableName,
+      Set<String> realtimeSegmentsToBeReProcessed,
+      Set<String> existingOfflineTableSegmentNames,
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+    Set<String> segmentsToBeDeleted = new HashSet<>();
+
+    for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+      String id = 
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+      Preconditions.checkNotNull(id);
+      ExpectedSubtaskResult expectedSubtaskResult =
+          expectedSubtaskResultMap.get(id);
+      // if already marked as failure, no need to delete again.
+      if (expectedSubtaskResult.isTaskFailure()) {
+        continue;
       }
+      List<String> expectedCorrespondingOfflineSegments = 
expectedSubtaskResult.getSegmentsTo();
+      segmentsToBeDeleted.addAll(
+          getSegmentsToDelete(expectedCorrespondingOfflineSegments, 
existingOfflineTableSegmentNames));
+      // The expectedRealtimeToOfflineTaskResult is confirmed to be
+      // related to a failed task. Mark it as a failure, since executor will
+      // then only replace expectedRealtimeToOfflineTaskResult for the
+      // segments to be reprocessed.
+      expectedSubtaskResult.setTaskFailure();
+    }
+
+    if (!segmentsToBeDeleted.isEmpty()) {
+      _clusterInfoAccessor.getPinotHelixResourceManager()
+          .deleteSegments(offlineTableName, new 
ArrayList<>(segmentsToBeDeleted));
+    }
+  }
+
+  private Set<String> getFailedTaskSegments(
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata,
+      Set<String> existingOfflineTableSegmentNames) {
+    Set<String> failedIds = new HashSet<>();
+
+    // Get all the ExpectedRealtimeToOfflineTaskResult of prev minion task
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+    Collection<ExpectedSubtaskResult> expectedSubtaskResultList =
+        expectedSubtaskResultMap.values();
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Set<String> expectedSubtaskResultIds =
+        new HashSet<>(segmentNameToExpectedSubtaskResultID.values());
+
+    Set<String> segmentNamesToReprocess = new HashSet<>();
 
-      if (skipGenerate) {
+    // Check what all offline segments are present currently
+    for (ExpectedSubtaskResult expectedSubtaskResult
+        : expectedSubtaskResultList) {
+
+      if (expectedSubtaskResult.isTaskFailure()) {
+        // if task is failure and is referenced by any segment, only then add 
to failed task.
+        if (expectedSubtaskResultIds.contains(expectedSubtaskResult.getId())) {
+          failedIds.add(expectedSubtaskResult.getId());
+        }
         continue;
       }
 
-      Map<String, String> configs = 
MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
-          _clusterInfoAccessor);
-      configs.putAll(getBaseTaskConfigs(tableConfig, segmentNames));
-      configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
-      configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
-
-      // Segment processor configs
-      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
String.valueOf(windowStartMs));
-      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
String.valueOf(windowEndMs));
-      String roundBucketTimePeriod = 
taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
-      if (roundBucketTimePeriod != null) {
-        
configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, 
roundBucketTimePeriod);
+      // get offline segments
+      List<String> segmentTo = expectedSubtaskResult.getSegmentsTo();
+
+      // If not all corresponding offline segments to a realtime segment 
exists,
+      // it means there was an issue with prev minion task. And segment needs
+      // to be re-processed.
+      boolean taskSuccessful = checkIfAllSegmentsExists(segmentTo, 
existingOfflineTableSegmentNames);

Review Comment:
   I think this will answer above
   https://github.com/apache/pinot/pull/14623/files#r1925930071



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to