snleee commented on code in PR #10261:
URL: https://github.com/apache/pinot/pull/10261#discussion_r1102323263


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -154,25 +156,33 @@ public void setMinionEventObserver(MinionEventObserver 
observer) {
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig 
pinotTaskConfig)
       throws Exception {
-    preProcess(pinotTaskConfig);
-    _pinotTaskConfig = pinotTaskConfig;
-    _eventObserver = 
MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
-    String taskType = pinotTaskConfig.getTaskType();
+    // check whether all segments to process exist in the table, if not, 
terminate early to avoid wasting computing
+    // resources
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
-    String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
-    String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    Set<String> nonExistentSegmentNames = 
SegmentConversionUtils.extractNonExistentSegments(tableNameWithType,
+        FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)),
+        
Arrays.asList(inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR)), 
authProvider);
+    if (!CollectionUtils.isEmpty(nonExistentSegmentNames)) {
+      throw new RuntimeException(String.format("Segments to process: %s do not 
exist in table: %s",

Review Comment:
   Let's change the order of list of segments and table. Depending on the case, 
the list of segments can be pretty long. In this case, it would be better to 
catch the table name in the first line and have the long list of segments. 
Otherwise, it will be a bit harder to figure out the table name.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -154,25 +156,33 @@ public void setMinionEventObserver(MinionEventObserver 
observer) {
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig 
pinotTaskConfig)
       throws Exception {
-    preProcess(pinotTaskConfig);
-    _pinotTaskConfig = pinotTaskConfig;
-    _eventObserver = 
MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
-    String taskType = pinotTaskConfig.getTaskType();
+    // check whether all segments to process exist in the table, if not, 
terminate early to avoid wasting computing

Review Comment:
   This is not relevant to this PR; but, can we also add the table level 
timeout for `endSegmentReplaceSegment` if possible?
   
   Currently, we have the minion instance level config 
`_minionConf.getEndReplaceSegmentsTimeoutMs()`. It would be very useful to us 
if we add the same config for task config in the table config. We can add the 
config in the `context` object.
   
   ```
     protected void postUploadSegments(SegmentUploadContext context)
         throws Exception {
       // Update the segment lineage to indicate that the segment replacement 
is done.
       _eventObserver.notifyProgress(_pinotTaskConfig,
           "Finishing uploading segments: " + 
context.getSegmentConversionResults().size());
       if (context.isReplaceSegmentsEnabled()) {
         String lineageEntryId = (String) 
context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID);
         
SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(), 
context.getUploadURL(), lineageEntryId,
             _minionConf.getEndReplaceSegmentsTimeoutMs(), 
context.getAuthProvider());
       }
     }
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java:
##########
@@ -61,6 +65,39 @@ public class SegmentConversionUtils {
   private SegmentConversionUtils() {
   }
 
+  /**
+   * Extract non-existent segments from the given list of segments
+   * @param tableNameWithType a table name with type
+   * @param controllerBaseURI the controller base URI
+   * @param segmentNames a list of segments to check
+   * @param authProvider a {@link AuthProvider}
+   * @return a set of non-existent segment names
+   * @throws Exception when there are exceptions checking whether the given 
list of segments all exist or not
+   */
+  public static Set<String> extractNonExistentSegments(String 
tableNameWithType, URI controllerBaseURI,

Review Comment:
   Instead of this, I think that it's probably better to have the function that 
provides the list of tables from the table.
   
   We can simply compute `nonExistentSegments` by the following:
   
   ```
   
inputSegmentsListFromTaskConfig.removeAll(SegmentConversionUtils.getSegmentsForTable(xxx))
   ```
   
   I feel that the getSegmentsForTable will likely be more frequently used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to