imply-cheddar commented on code in PR #13506: URL: https://github.com/apache/druid/pull/13506#discussion_r1041845419
########## docs/multi-stage-query/reference.md: ########## @@ -198,13 +198,99 @@ The following table lists the context parameters for the MSQ task engine: | `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 | | `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` | | `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true | +| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` | Review Comment: This sort of thing would normally be delivered via a hint in the SQL, perhaps it's not too hard to deliver it that way? ########## processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.key; + +public enum KeyOrder +{ + NONE(false), Review Comment: instead of `NONE` means "hashed", would it make sense to just call it `HASHED`? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -595,8 +596,8 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum final StageDefinition stageDef = queryKernel.getStageDefinition(stageId); final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot( context.jsonMapper(), - stageDef.getShuffleSpec().get().getClusterBy(), - stageDef.getShuffleSpec().get().doesAggregateByClusterKey() + stageDef.getShuffleSpec().clusterBy(), + stageDef.getShuffleSpec().doesAggregate() Review Comment: You hate the word "get"/"is"? "getClusterBy()" "isDoesAggregate()"? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java: ########## @@ -73,4 +74,9 @@ DruidNode selfNode(); Bouncer processorBouncer(); + + default File tempDir(int stageNumber, String id) Review Comment: Nit brigade! I think we tend to use `tmp` in other parts of the code? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java: ########## @@ -1292,6 +853,803 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p } } + /** + * Main worker logic for executing a {@link WorkOrder}. + */ + private class RunWorkOrder + { + private final WorkerStageKernel kernel; + private final InputChannelFactory inputChannelFactory; + private final CounterTracker counterTracker; + private final FrameProcessorExecutor exec; + private final String cancellationId; + private final int parallelism; + private final FrameContext frameContext; + private final MSQWarningReportPublisher warningPublisher; + + private InputSliceReader inputSliceReader; + private OutputChannelFactory workOutputChannelFactory; + private OutputChannelFactory shuffleOutputChannelFactory; + private ResultAndChannels<?> workResultAndOutputChannels; + private SettableFuture<ClusterByPartitions> stagePartitionBoundariesFuture; + private ListenableFuture<OutputChannels> shuffleOutputChannelsFuture; + + public RunWorkOrder( + final WorkerStageKernel kernel, + final InputChannelFactory inputChannelFactory, + final CounterTracker counterTracker, + final FrameProcessorExecutor exec, + final String cancellationId, + final int parallelism, + final FrameContext frameContext, + final MSQWarningReportPublisher warningPublisher + ) + { + this.kernel = kernel; + this.inputChannelFactory = inputChannelFactory; + this.counterTracker = counterTracker; + this.exec = exec; + this.cancellationId = cancellationId; + this.parallelism = parallelism; + this.frameContext = frameContext; + this.warningPublisher = warningPublisher; + } + + private void start() throws IOException + { + final WorkOrder workOrder = kernel.getWorkOrder(); + final StageDefinition stageDef = workOrder.getStageDefinition(); + + makeInputSliceReader(); + makeWorkOutputChannelFactory(); + makeShuffleOutputChannelFactory(); + makeAndRunWorkProcessors(); + + if (stageDef.doesShuffle()) { + makeAndRunShuffleProcessors(); + } else { + // No shuffling: work output _is_ shuffle output. Retain read-only versions to reduce memory footprint. + shuffleOutputChannelsFuture = + Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly()); + } + + setUpCompletionCallbacks(); + } + + /** + * Settable {@link ClusterByPartitions} future for global sort. Necessary because we don't know ahead of time + * what the boundaries will be. The controller decides based on statistics from all workers. Once the controller + * decides, its decision is written to this future, which allows sorting on workers to proceed. + */ + @Nullable + public SettableFuture<ClusterByPartitions> getStagePartitionBoundariesFuture() + { + return stagePartitionBoundariesFuture; + } + + private void makeInputSliceReader() + { + if (inputSliceReader != null) { + throw new ISE("inputSliceReader already created"); + } + + final WorkOrder workOrder = kernel.getWorkOrder(); + final String queryId = workOrder.getQueryDefinition().getQueryId(); + + final InputChannels inputChannels = + new InputChannelsImpl( + workOrder.getQueryDefinition(), + InputSlices.allReadablePartitions(workOrder.getInputs()), + inputChannelFactory, + () -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()), + exec, + cancellationId + ); + + inputSliceReader = new MapInputSliceReader( Review Comment: Is this Map guaranteed to be used to read from all of the different types of slices? Is there a reason that it needs to be re-instantiated instead of just built once and reused? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
