kfaraz commented on code in PR #15981:
URL: https://github.com/apache/druid/pull/15981#discussion_r1512090750


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   private volatile Pair<Map<String, Object>, Map<String, Object>> 
indexGenerateRowStats;
 
   private IngestionState ingestionState;
+  private Map<String, TaskReport> completionReports;
+  private final Boolean isCompactionTask;

Review Comment:
   Nit: This should be a primitive `boolean` rather than a boxed one, as we 
never intend to assign `null` to it.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -292,6 +296,20 @@ public Set<ResourceAction> getInputSourceResources()
            ImmutableSet.of();
   }
 
+  @Nullable
+  @JsonIgnore
+  public Map<String, TaskReport> getCompletionReports()
+  {
+    return completionReports;
+  }
+
+  @Nullable
+  @JsonIgnore
+  public String getBaseSubtaskSpecName()

Review Comment:
   This is not needed. Please see the other comment in `CompactionTask`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -556,7 +567,8 @@ public TaskStatus runTask(final TaskToolbox toolbox)
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+      completionReports = getTaskCompletionReports();
+      writeCompletionReports(toolbox);

Review Comment:
   Nit: These two lines have been repeated multiple times. The method 
`writeCompletionReports` can be renamed to `updateAndWriteCompletionReports` 
and modified to do the assignment of the field `completionReports` as well as 
write out the reports.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -514,6 +518,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
               failCnt++;
               log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
             }
+            Optional.ofNullable(eachSpec.getCompletionReports())
+                    .ifPresent(reports -> completionReports.putAll(
+                        CollectionUtils.mapKeys(reports, key -> 
getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));

Review Comment:
   It does make sense to use the same index prefix `_0` or `_1` as was 
originally allocated to the underlying sub-task spec.
   
   But `indexTaskSpecs` is already a list and we just need to get the index of 
the item in the list rather than going through the trouble of fetching back the 
`baseSubTaskSpecName` and only extracting the index from it. You could use an 
`AtomicInteger` to track the current index.
   
   ```suggestion
                           CollectionUtils.mapKeys(reports, key -> key + "_" + 
specIndex.incrementAndGet())));
   ```
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -562,6 +572,11 @@ private String createIndexTaskSpecId(int i)
     return StringUtils.format("%s_%d", getId(), i);
   }
 
+  private String getReportkey(String baseSequenceName, String currentKey)

Review Comment:
   This method is not needed, please see the other comment regarding building 
the report key using index of item in list.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -169,7 +169,9 @@ private static String makeGroupId(String dataSource, 
IngestionMode ingestionMode
 
   private IngestionState ingestionState;
 
-  private boolean shouldCleanup;
+  // used to specify if indextask.run() is run as a part of another task

Review Comment:
   This comment should be included in the javadoc of the constructor of this 
class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -499,6 +502,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
       log.info("Generated [%d] compaction task specs", totalNumSpecs);
 
       int failCnt = 0;
+      Map<String, TaskReport> completionReports = new HashMap<>();

Review Comment:
   If the compaction is being run on several intervals (not very likely but 
still a possibility), can holding all the task reports in memory potentially 
cause an OOM exception? Currently, most of the task reports contain only 
`ingestStatsAndErrors` but they may contain other stuff in the future.
   
   In the future, we should consider writing out the sub-reports in a streaming 
fashion alongwith the required changes to the `TaskReportFileWriter` API.
   For now, we should add a guardrail here so that we don't try to hold too 
many reports in memory and fail with an OOM.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1247,6 +1261,13 @@ private Map<String, TaskReport> 
getTaskCompletionReports(TaskStatus taskStatus,
     );
   }
 
+  private void writeCompletionReports(TaskToolbox toolbox)

Review Comment:
   Add another method `updateAndWriteCompletionReports` that updates the field 
`completionReports` and also writes them out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to