gianm commented on code in PR #13506:
URL: https://github.com/apache/druid/pull/13506#discussion_r1042621537


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -1292,6 +853,803 @@ public ReadableFrameChannel openChannel(StageId stageId, 
int workerNumber, int p
     }
   }
 
+  /**
+   * Main worker logic for executing a {@link WorkOrder}.
+   */
+  private class RunWorkOrder
+  {
+    private final WorkerStageKernel kernel;
+    private final InputChannelFactory inputChannelFactory;
+    private final CounterTracker counterTracker;
+    private final FrameProcessorExecutor exec;
+    private final String cancellationId;
+    private final int parallelism;
+    private final FrameContext frameContext;
+    private final MSQWarningReportPublisher warningPublisher;
+
+    private InputSliceReader inputSliceReader;
+    private OutputChannelFactory workOutputChannelFactory;
+    private OutputChannelFactory shuffleOutputChannelFactory;
+    private ResultAndChannels<?> workResultAndOutputChannels;
+    private SettableFuture<ClusterByPartitions> stagePartitionBoundariesFuture;
+    private ListenableFuture<OutputChannels> shuffleOutputChannelsFuture;
+
+    public RunWorkOrder(
+        final WorkerStageKernel kernel,
+        final InputChannelFactory inputChannelFactory,
+        final CounterTracker counterTracker,
+        final FrameProcessorExecutor exec,
+        final String cancellationId,
+        final int parallelism,
+        final FrameContext frameContext,
+        final MSQWarningReportPublisher warningPublisher
+    )
+    {
+      this.kernel = kernel;
+      this.inputChannelFactory = inputChannelFactory;
+      this.counterTracker = counterTracker;
+      this.exec = exec;
+      this.cancellationId = cancellationId;
+      this.parallelism = parallelism;
+      this.frameContext = frameContext;
+      this.warningPublisher = warningPublisher;
+    }
+
+    private void start() throws IOException
+    {
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final StageDefinition stageDef = workOrder.getStageDefinition();
+
+      makeInputSliceReader();
+      makeWorkOutputChannelFactory();
+      makeShuffleOutputChannelFactory();
+      makeAndRunWorkProcessors();
+
+      if (stageDef.doesShuffle()) {
+        makeAndRunShuffleProcessors();
+      } else {
+        // No shuffling: work output _is_ shuffle output. Retain read-only 
versions to reduce memory footprint.
+        shuffleOutputChannelsFuture =
+            
Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly());
+      }
+
+      setUpCompletionCallbacks();
+    }
+
+    /**
+     * Settable {@link ClusterByPartitions} future for global sort. Necessary 
because we don't know ahead of time
+     * what the boundaries will be. The controller decides based on statistics 
from all workers. Once the controller
+     * decides, its decision is written to this future, which allows sorting 
on workers to proceed.
+     */
+    @Nullable
+    public SettableFuture<ClusterByPartitions> 
getStagePartitionBoundariesFuture()
+    {
+      return stagePartitionBoundariesFuture;
+    }
+
+    private void makeInputSliceReader()
+    {
+      if (inputSliceReader != null) {
+        throw new ISE("inputSliceReader already created");
+      }
+
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final String queryId = workOrder.getQueryDefinition().getQueryId();
+
+      final InputChannels inputChannels =
+          new InputChannelsImpl(
+              workOrder.getQueryDefinition(),
+              InputSlices.allReadablePartitions(workOrder.getInputs()),
+              inputChannelFactory,
+              () -> 
ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
+              exec,
+              cancellationId
+          );
+
+      inputSliceReader = new MapInputSliceReader(

Review Comment:
   It's built once per worker. It does have some special pieces that are 
specific to this worker, so it isn't shared across workers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to