kfaraz commented on code in PR #14507:
URL: https://github.com/apache/druid/pull/14507#discussion_r1274375506
##########
indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java:
##########
@@ -237,6 +244,20 @@ public List<String> getMetrics()
return metrics;
}
+ @Override
+ public InputSource withTaskToolbox(TaskToolbox toolbox)
Review Comment:
Config classes should ideally be immutable and afaict, this class is too. We
should retain that immutability and return a fresh object here with the
`TaskToolbox` set.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1817,6 +1824,11 @@ static Map<String, Object> getTaskReport(final
OverlordClient overlordClient, fi
}
}
+ private InputSource getInputSource()
+ {
+ return inputSourceRef.get();
Review Comment:
The suggestion was to set this here if not already set, so that we are
guaranteed to never return a null value.
##########
indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java:
##########
@@ -151,6 +156,8 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
*/
private final List<String> metrics;
+ private final AtomicReference<TaskToolbox> toolboxRef = new
AtomicReference<>();
Review Comment:
This should be a final nullable `TaskToolbox` and not an atomic reference.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -156,7 +158,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
* Only the compaction task can have a special base name.
*/
private final String baseSubtaskSpecName;
- private final InputSource baseInputSource;
+
+ private final AtomicReference<InputSource> inputSourceRef = new
AtomicReference<>();
Review Comment:
You can still call it the `baseInputSource`. `inputSourceRef` is a confusing
name.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -109,7 +110,10 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
@Override
public final TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
- final InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource();
+ InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
+ if (inputSource instanceof TaskInputSource) {
Review Comment:
Why do we need to do this here and in other `runTask` methods? Doesn't the
`ioConfig.getNonNullInputSource` already return an `InputSource` with the
`TaskToolbox` set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]