kfaraz commented on code in PR #15981:
URL: https://github.com/apache/druid/pull/15981#discussion_r1512090750
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private volatile Pair<Map<String, Object>, Map<String, Object>>
indexGenerateRowStats;
private IngestionState ingestionState;
+ private Map<String, TaskReport> completionReports;
+ private final Boolean isCompactionTask;
Review Comment:
Nit: This should be a primitive `boolean` rather than a boxed one, as we
never intend to assign `null` to it.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -292,6 +296,20 @@ public Set<ResourceAction> getInputSourceResources()
ImmutableSet.of();
}
+ @Nullable
+ @JsonIgnore
+ public Map<String, TaskReport> getCompletionReports()
+ {
+ return completionReports;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public String getBaseSubtaskSpecName()
Review Comment:
This is not needed. Please see the other comment in `CompactionTask`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -556,7 +567,8 @@ public TaskStatus runTask(final TaskToolbox toolbox)
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
errorMsg = Throwables.getStackTraceAsString(e);
- toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
+ completionReports = getTaskCompletionReports();
+ writeCompletionReports(toolbox);
Review Comment:
Nit: These two lines have been repeated multiple times. The method
`writeCompletionReports` can be renamed to `updateAndWriteCompletionReports`
and modified to do the assignment of the field `completionReports` as well as
write out the reports.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -514,6 +518,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
}
+ Optional.ofNullable(eachSpec.getCompletionReports())
+ .ifPresent(reports -> completionReports.putAll(
+ CollectionUtils.mapKeys(reports, key ->
getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
Review Comment:
It does make sense to use the same index prefix `_0` or `_1` as was
originally allocated to the underlying sub-task spec.
But `indexTaskSpecs` is already a list and we just need to get the index of
the item in the list rather than going through the trouble of fetching back the
`baseSubTaskSpecName` and only extracting the index from it. You could use an
`AtomicInteger` to track the current index.
```suggestion
CollectionUtils.mapKeys(reports, key -> key + "_" +
specIndex.incrementAndGet())));
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -562,6 +572,11 @@ private String createIndexTaskSpecId(int i)
return StringUtils.format("%s_%d", getId(), i);
}
+ private String getReportkey(String baseSequenceName, String currentKey)
Review Comment:
This method is not needed, please see the other comment regarding building
the report key using index of item in list.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -169,7 +169,9 @@ private static String makeGroupId(String dataSource,
IngestionMode ingestionMode
private IngestionState ingestionState;
- private boolean shouldCleanup;
+ // used to specify if indextask.run() is run as a part of another task
Review Comment:
This comment should be included in the javadoc of the constructor of this
class.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -499,6 +502,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
+ Map<String, TaskReport> completionReports = new HashMap<>();
Review Comment:
If the compaction is being run on several intervals (not very likely but
still a possibility), can holding all the task reports in memory potentially
cause an OOM exception? Currently, most of the task reports contain only
`ingestStatsAndErrors` but they may contain other stuff in the future.
In the future, we should consider writing out the sub-reports in a streaming
fashion alongwith the required changes to the `TaskReportFileWriter` API.
For now, we should add a guardrail here so that we don't try to hold too
many reports in memory and fail with an OOM.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1247,6 +1261,13 @@ private Map<String, TaskReport>
getTaskCompletionReports(TaskStatus taskStatus,
);
}
+ private void writeCompletionReports(TaskToolbox toolbox)
Review Comment:
Add another method `updateAndWriteCompletionReports` that updates the field
`completionReports` and also writes them out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]