imply-cheddar commented on code in PR #13506:
URL: https://github.com/apache/druid/pull/13506#discussion_r1041845419


##########
docs/multi-stage-query/reference.md:
##########
@@ -198,13 +198,99 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number 
of tasks to launch, including the controller task. The lowest possible value 
for this setting is 2: one controller and one worker. All tasks must be able to 
launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` 
error code after approximately 10 minutes.<br /><br />May also be provided as 
`numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
 | `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many 
tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as 
possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be 
determined through directory listing (for example: local files, S3, GCS, HDFS) 
uses as few tasks as possible without exceeding 10 GiB or 10,000 files per 
task, unless exceeding these limits is necessary to stay within `maxNumTasks`. 
When file sizes cannot be determined through directory listing (for example: 
http), behaves the same as `max`.</li></ul> | `max` |
 | `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the 
type of aggregation to return. If true, Druid finalizes the results of complex 
aggregations that directly appear in query results. If false, Druid returns the 
aggregation's intermediate type rather than finalized type. This parameter is 
useful during ingestion, where it enables storing sketches directly in Druid 
tables. For more information about aggregations, see [SQL aggregation 
functions](../querying/sql-aggregations.md). | true |
+| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for 
JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for 
sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) 
for more details. | `broadcast` |

Review Comment:
   This sort of thing would normally be delivered via a hint in the SQL, 
perhaps it's not too hard to deliver it that way?



##########
processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+public enum KeyOrder
+{
+  NONE(false),

Review Comment:
   instead of `NONE` means "hashed", would it make sense to just call it 
`HASHED`?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -595,8 +596,8 @@ public void updatePartialKeyStatisticsInformation(int 
stageNumber, int workerNum
           final StageDefinition stageDef = 
queryKernel.getStageDefinition(stageId);
           final ObjectMapper mapper = 
MSQTasks.decorateObjectMapperForKeyCollectorSnapshot(
               context.jsonMapper(),
-              stageDef.getShuffleSpec().get().getClusterBy(),
-              stageDef.getShuffleSpec().get().doesAggregateByClusterKey()
+              stageDef.getShuffleSpec().clusterBy(),
+              stageDef.getShuffleSpec().doesAggregate()

Review Comment:
   You hate the word "get"/"is"?  "getClusterBy()" "isDoesAggregate()"?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java:
##########
@@ -73,4 +74,9 @@
   DruidNode selfNode();
 
   Bouncer processorBouncer();
+
+  default File tempDir(int stageNumber, String id)

Review Comment:
   Nit brigade!  I think we tend to use `tmp` in other parts of the code?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -1292,6 +853,803 @@ public ReadableFrameChannel openChannel(StageId stageId, 
int workerNumber, int p
     }
   }
 
+  /**
+   * Main worker logic for executing a {@link WorkOrder}.
+   */
+  private class RunWorkOrder
+  {
+    private final WorkerStageKernel kernel;
+    private final InputChannelFactory inputChannelFactory;
+    private final CounterTracker counterTracker;
+    private final FrameProcessorExecutor exec;
+    private final String cancellationId;
+    private final int parallelism;
+    private final FrameContext frameContext;
+    private final MSQWarningReportPublisher warningPublisher;
+
+    private InputSliceReader inputSliceReader;
+    private OutputChannelFactory workOutputChannelFactory;
+    private OutputChannelFactory shuffleOutputChannelFactory;
+    private ResultAndChannels<?> workResultAndOutputChannels;
+    private SettableFuture<ClusterByPartitions> stagePartitionBoundariesFuture;
+    private ListenableFuture<OutputChannels> shuffleOutputChannelsFuture;
+
+    public RunWorkOrder(
+        final WorkerStageKernel kernel,
+        final InputChannelFactory inputChannelFactory,
+        final CounterTracker counterTracker,
+        final FrameProcessorExecutor exec,
+        final String cancellationId,
+        final int parallelism,
+        final FrameContext frameContext,
+        final MSQWarningReportPublisher warningPublisher
+    )
+    {
+      this.kernel = kernel;
+      this.inputChannelFactory = inputChannelFactory;
+      this.counterTracker = counterTracker;
+      this.exec = exec;
+      this.cancellationId = cancellationId;
+      this.parallelism = parallelism;
+      this.frameContext = frameContext;
+      this.warningPublisher = warningPublisher;
+    }
+
+    private void start() throws IOException
+    {
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final StageDefinition stageDef = workOrder.getStageDefinition();
+
+      makeInputSliceReader();
+      makeWorkOutputChannelFactory();
+      makeShuffleOutputChannelFactory();
+      makeAndRunWorkProcessors();
+
+      if (stageDef.doesShuffle()) {
+        makeAndRunShuffleProcessors();
+      } else {
+        // No shuffling: work output _is_ shuffle output. Retain read-only 
versions to reduce memory footprint.
+        shuffleOutputChannelsFuture =
+            
Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly());
+      }
+
+      setUpCompletionCallbacks();
+    }
+
+    /**
+     * Settable {@link ClusterByPartitions} future for global sort. Necessary 
because we don't know ahead of time
+     * what the boundaries will be. The controller decides based on statistics 
from all workers. Once the controller
+     * decides, its decision is written to this future, which allows sorting 
on workers to proceed.
+     */
+    @Nullable
+    public SettableFuture<ClusterByPartitions> 
getStagePartitionBoundariesFuture()
+    {
+      return stagePartitionBoundariesFuture;
+    }
+
+    private void makeInputSliceReader()
+    {
+      if (inputSliceReader != null) {
+        throw new ISE("inputSliceReader already created");
+      }
+
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final String queryId = workOrder.getQueryDefinition().getQueryId();
+
+      final InputChannels inputChannels =
+          new InputChannelsImpl(
+              workOrder.getQueryDefinition(),
+              InputSlices.allReadablePartitions(workOrder.getInputs()),
+              inputChannelFactory,
+              () -> 
ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
+              exec,
+              cancellationId
+          );
+
+      inputSliceReader = new MapInputSliceReader(

Review Comment:
   Is this Map guaranteed to be used to read from all of the different types of 
slices?  Is there a reason that it needs to be re-instantiated instead of just 
built once and reused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to