AmatyaAvadhanula commented on code in PR #14507:
URL: https://github.com/apache/druid/pull/14507#discussion_r1276366128
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -245,7 +251,7 @@ public ParallelIndexSupervisorTask(
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
}
- this.baseInputSource =
ingestionSchema.getIOConfig().getNonNullInputSource();
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1817,6 +1824,24 @@ static Map<String, Object> getTaskReport(final
OverlordClient overlordClient, fi
}
}
+ /**
+ * To be called only after the toolbox has been set in runTask
+ *
+ * @return the base input source with the toolbox
+ */
+ private InputSource getInputSource()
+ {
+ InputSource inputSource = inputSourceRef.get();
+ if (inputSource == null) {
+ inputSource = this.baseInputSource;
+ if (inputSource instanceof TaskInputSource) {
+ inputSource = ((TaskInputSource) inputSource).withTaskToolbox(toolbox);
+ }
+ inputSourceRef.set(inputSource);
+ }
+ return inputSource;
Review Comment:
Made the change, thanks
##########
indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java:
##########
@@ -167,6 +172,39 @@ public DruidInputSource(
@JacksonInject RetryPolicyFactory retryPolicyFactory,
@JacksonInject TaskConfig taskConfig
)
+ {
+ this(
+ dataSource,
+ interval,
+ segmentIds,
+ dimFilter,
+ dimensions,
+ metrics,
+ indexIO,
+ coordinatorClient,
+ segmentCacheManagerFactory,
+ retryPolicyFactory,
+ taskConfig,
+ null
+ );
+ }
+
+ private DruidInputSource(
+ final String dataSource,
+ @Nullable Interval interval,
+ // Specifying "segments" is intended only for when this FirehoseFactory
has split itself,
Review Comment:
cleaned
##########
indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java:
##########
@@ -363,7 +428,7 @@ public int estimateNumSplits(InputFormat inputFormat,
@Nullable SplitHintSpec sp
@Override
public SplittableInputSource<List<WindowedSegmentId>>
withSplit(InputSplit<List<WindowedSegmentId>> split)
{
- return new DruidInputSource(
+ return (DruidInputSource) new DruidInputSource(
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]