adarshsanjeev commented on code in PR #15399:
URL: https://github.com/apache/druid/pull/15399#discussion_r1488901958
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java:
##########
@@ -146,32 +142,91 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
);
// Function to generate a processor manger for the regular processors,
which run after the segmentMapFnProcessor.
- final Function<Function<SegmentReference, SegmentReference>,
ProcessorManager<Object, Long>> processorManagerFn =
- segmentMapFn ->
- new BaseLeafFrameProcessorManager(
- processorBaseInputs,
- segmentMapFn,
- frameWriterFactoryQueue,
- channelQueue,
- frameContext,
- this
- );
+ final Function<List<Function<SegmentReference, SegmentReference>>,
ProcessorManager<Object, Long>> processorManagerFn = segmentMapFnList -> {
+ final Function<SegmentReference, SegmentReference> segmentMapFunction =
+ CollectionUtils.getOnlyElement(segmentMapFnList, throwable ->
DruidException.defensive("Only one segment map function expected"));
+ return createBaseLeafProcessorManagerWithHandoff(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext
+ );
+ };
//noinspection rawtypes
final ProcessorManager processorManager;
if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new
AtomicLong());
- processorManager = processorManagerFn.apply(segmentMapFn);
+ processorManager =
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
- processorManager = new ChainedProcessorManager<>(segmentMapFnProcessor,
processorManagerFn);
+ processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(()
-> segmentMapFnProcessor), processorManagerFn);
}
//noinspection unchecked,rawtypes
return new ProcessorsAndChannels<>(processorManager,
OutputChannels.wrapReadOnly(outputChannels));
}
+ private ProcessorManager<Object, Long>
createBaseLeafProcessorManagerWithHandoff(
+ final StageDefinition stageDefinition,
+ final List<InputSlice> inputSlices,
+ final InputSliceReader inputSliceReader,
+ final CounterTracker counters,
+ final Consumer<Throwable> warningPublisher,
+ final Function<SegmentReference, SegmentReference> segmentMapFunction,
+ final Queue<FrameWriterFactory> frameWriterFactoryQueue,
+ final Queue<WritableFrameChannel> channelQueue,
+ final FrameContext frameContext
+ )
+ {
+ final BaseLeafFrameProcessorFactory factory = this;
+ // Read all base inputs in separate processors, one per processor.
+ final Iterable<ReadableInput> processorBaseInputs = readBaseInputs(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher
+ );
+
+ return new ChainedProcessorManager<>(
+ new BaseLeafFrameProcessorManager(
+ processorBaseInputs,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext,
+ factory
+ ),
+ objects -> {
+ if (objects.isEmpty()) {
+ return ProcessorManagers.none();
+ }
+ List<InputSlice> handedOffSegments = new ArrayList<>();
+ for (Object o : objects) {
+ if (o instanceof SegmentsInputSlice) {
Review Comment:
Added a null check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]