paul-rogers commented on code in PR #12918: URL: https://github.com/apache/druid/pull/12918#discussion_r950527010
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Key; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.guice.MultiStageQuery; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault; +import org.apache.druid.msq.indexing.error.InsertTimeNullFault; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQFault; +import org.apache.druid.msq.indexing.error.UnknownFault; +import org.apache.druid.msq.indexing.error.WorkerFailedFault; +import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; +import org.apache.druid.msq.statistics.KeyCollectorFactory; +import org.apache.druid.msq.statistics.KeyCollectorSnapshot; +import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule; +import org.apache.druid.msq.statistics.KeyCollectors; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.server.DruidNode; +import org.apache.druid.storage.StorageConnector; + +import javax.annotation.Nullable; +import java.util.UUID; + +public class MSQTasks +{ + /** + * Message used by {@link #makeErrorReport} when no other message is known. + */ + static final String GENERIC_QUERY_FAILED_MESSAGE = "Query failed"; + + private static final String TASK_ID_PREFIX = "query-"; + + /** + * Returns a controller task ID given a + */ + public static String controllerTaskId(@Nullable final String queryId) + { + return TASK_ID_PREFIX + (queryId == null ? UUID.randomUUID().toString() : queryId); + } + + /** + * Returns a controller task ID given a + */ + public static String workerTaskId(final String controllerTaskId, final int workerNumber) + { + return StringUtils.format("%s-worker%d", controllerTaskId, workerNumber); + } + + /** + * If "Object" is a Long, returns it. Otherwise, throws an appropriate exception assuming this operation is + * being done to read the primary timestamp (__time) as part of an INSERT. + */ + public static long primaryTimestampFromObjectForInsert(final Object timestamp) + { + if (timestamp instanceof Long) { + return (long) timestamp; + } else if (timestamp == null) { + throw new MSQException(InsertTimeNullFault.INSTANCE); + } else { + // Normally we expect the SQL layer to validate that __time for INSERT is a TIMESTAMP type, which would + // be a long at execution time. So a nice user-friendly message isn't needed here: it would only happen + // if the SQL layer is bypassed. Nice, friendly users wouldn't do that :) Review Comment: This level of "behind the scenes" explanation is super helpful! ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Slice of an {@link InputSpec} assigned to a particular worker. + * + * On the controller, these are produced using {@link InputSpecSlicer}. On workers, these are read + * using {@link InputSliceReader}. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +public interface InputSlice +{ + /** + * Returns the number of files contained within this split. This is the same number that would be added to + * {@link org.apache.druid.msq.counters.CounterTracker} on full iteration through {@link InputSliceReader#attach}. + * + * May be zero for some kinds of slices, even if they contain data, if the input is not file-based. + */ + int numFiles(); Review Comment: Nit: `fileCount` reads just a bit better than `numFiles`. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.groupby; + +import com.google.common.base.Preconditions; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.SortColumn; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.MaxCountShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.querykit.DataSourcePlan; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.msq.querykit.ShuffleSpecFactories; +import org.apache.druid.msq.querykit.ShuffleSpecFactory; +import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.having.AlwaysHavingSpec; +import org.apache.druid.query.groupby.having.DimFilterHavingSpec; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparator; +import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class GroupByQueryKit implements QueryKit<GroupByQuery> Review Comment: A general comment on this approach is that we seem to be mapping from one big query definition (the native query) to another (the MSQ query definition.) Other projects have had much success decomposing a query plan into isolated functional units (often called "operators") that feed into one another via a pipeline. A future direction here might be to do the same: have a frame processor hold a pipeline of operators for scan, sort, merge, limit, offset, etc. Might make the logic a bit more flexible and reusable. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.scan; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.SortColumn; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.MaxCountShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.querykit.DataSourcePlan; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.msq.querykit.ShuffleSpecFactory; +import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; +import org.apache.druid.query.Query; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.rel.DruidQuery; + +import java.util.ArrayList; +import java.util.List; + +public class ScanQueryKit implements QueryKit<ScanQuery> +{ + private final ObjectMapper jsonMapper; + + public ScanQueryKit(final ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + public static RowSignature getAndValidateSignature(final ScanQuery scanQuery, final ObjectMapper jsonMapper) + { + RowSignature scanSignature; + try { + final String s = scanQuery.getContextValue(DruidQuery.CTX_SCAN_SIGNATURE); + scanSignature = jsonMapper.readValue(s, RowSignature.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + // Verify the signature prior to any actual processing. + QueryKitUtils.verifyRowSignature(scanSignature); + return scanSignature; + } + + /** + * We ignore the resultShuffleSpecFactory in case: + * 1. There is no cluster by + * 2. This is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec + */ + // No ordering, but there is a limit or an offset. These work by funneling everything through a single partition. + // So there is no point in forcing any particular partitioning. Since everything is funnelled into a single + // partition without a ClusterBy, we donot need to necessarily create it via the resultShuffleSpecFactory provided Review Comment: Nit: "donot" -> "do not" ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.kernel.controller; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.exec.QueryValidator; +import org.apache.druid.msq.indexing.error.MSQFault; +import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerFactory; +import org.apache.druid.msq.input.stage.ReadablePartitions; +import org.apache.druid.msq.kernel.ExtraInfoHolder; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Kernel for the controller of a multi-stage query. + * + * Instances of this class are state machines for query execution. Kernels do not do any RPC or deal with any data. + * This separation of decision-making from the "real world" allows the decision-making to live in one, + * easy-to-follow place. + * + * @see org.apache.druid.msq.kernel.worker.WorkerStageKernel state machine on the worker side + */ +public class ControllerQueryKernel +{ + private final QueryDefinition queryDef; + + // Stage ID -> tracker for that stage. An extension of the state of this kernel. + private final Map<StageId, ControllerStageTracker> stageTracker = new HashMap<>(); + + private Map<StageId, Set<StageId>> inflowMap = null; // Will be initialized to an unmodifiable map + private Map<StageId, Set<StageId>> outflowMap = null; // Will be initialized to an unmodifiable map + + // Maintains a running map of (stageId -> pending inflow stages) which need to be completed to provision the stage + // corresponding to the stageId. After initializing,if the value of the entry becomes an empty set, it is removed from + // the map, and the removed entry is added to readyToRunStages + private Map<StageId, Set<StageId>> pendingInflowMap = null; + + // Maintains a running count of (stageId -> outflow stages pending on its results). After initializing, if + // the value of the entry becomes an empty set, it is removed from the map and the removed entry is added to + // effectivelyFinishedStages + private Map<StageId, Set<StageId>> pendingOutflowMap = null; + + // Tracks those stages which can be initialized safely. + private final Set<StageId> readyToRunStages = new HashSet<>(); + + // Tracks the stageIds which can be finished. Once returned by getEffectivelyFinishedStageKernels(), it gets cleared + // and not tracked anymore in this Set + private final Set<StageId> effectivelyFinishedStages = new HashSet<>(); // Modifiable map + + public ControllerQueryKernel(final QueryDefinition queryDef) + { + this.queryDef = queryDef; + initializeStageDAGMaps(); + initializeLeafStages(); + } + + /** + * Creates new kernels, if they can be initialized, and returns the tracked kernels which are in NEW phase + */ + public List<StageId> createAndGetNewStageIds( + final InputSpecSlicerFactory slicerFactory, + final WorkerAssignmentStrategy assignmentStrategy + ) + { + final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap(); + final Int2ObjectMap<ReadablePartitions> stagePartitionsMap = new Int2ObjectAVLTreeMap<>(); + + for (final ControllerStageTracker stageKernel : stageTracker.values()) { + final int stageNumber = stageKernel.getStageDefinition().getStageNumber(); + stageWorkerCountMap.put(stageNumber, stageKernel.getWorkerInputs().workerCount()); + + if (stageKernel.hasResultPartitions()) { + stagePartitionsMap.put(stageNumber, stageKernel.getResultPartitions()); + } + } + + createNewKernels(stageWorkerCountMap, slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy); + return stageTracker.values() + .stream() + .filter(controllerStageTracker -> controllerStageTracker.getPhase() == ControllerStagePhase.NEW) + .map(stageKernel -> stageKernel.getStageDefinition().getId()) + .collect(Collectors.toList()); + } + + /** + * @return Stage kernels in this query kernel which can be safely cleaned up and marked as FINISHED. This returns the + * kernel corresponding to a particular stage only once, to reduce the number of stages to iterate through. + * It is expectant of the caller to eventually mark the stage as {@link ControllerStagePhase#FINISHED} after fetching + * the stage kernel + */ + public List<StageId> getEffectivelyFinishedStageIds() + { + return ImmutableList.copyOf(effectivelyFinishedStages); + } + + /** + * Returns all the kernels which have been initialized and are being tracked + */ + public List<StageId> getActiveStages() + { + return ImmutableList.copyOf(stageTracker.keySet()); + } + + /** + * Returns a stage's kernel corresponding to a particular stage number + */ + public StageId getStageId(final int stageNumber) + { + return new StageId(queryDef.getQueryId(), stageNumber); + } + + /** + * Returns true if query needs no further processing, i.e. if final stage is successful or if any of the stages have + * been failed + */ + public boolean isDone() + { + return Optional.ofNullable(stageTracker.get(queryDef.getFinalStageDefinition().getId())) + .filter(tracker -> ControllerStagePhase.isSuccessfulTerminalPhase(tracker.getPhase())) + .isPresent() + || stageTracker.values().stream().anyMatch(tracker -> tracker.getPhase() == ControllerStagePhase.FAILED); + } + + /** + * Marks all the successful terminal stages to completion, so that the queryKernel shows a canonical view of + * phases of the stages once it completes + */ + public void markSuccessfulTerminalStagesAsFinished() + { + for (final StageId stageId : getActiveStages()) { + ControllerStagePhase phase = getStagePhase(stageId); + // While the following conditional is redundant currently, it makes logical sense to mark all the "successful + // terminal phases" to FINISHED at the end, hence the if clause. Inside the conditional, depending on the + // terminal phase it resides in, we synthetically mark it to completion (and therefore we need to check which + // stage it is precisely in) + if (ControllerStagePhase.isSuccessfulTerminalPhase(phase)) { + if (phase == ControllerStagePhase.RESULTS_READY) { + finishStage(stageId, false); + } + } + } + } + + /** + * Returns true if all the stages comprising the query definition have been sucessful in producing their results + */ + public boolean isSuccess() + { + return stageTracker.size() == queryDef.getStageDefinitions().size() + && stageTracker.values() + .stream() + .allMatch(tracker -> ControllerStagePhase.isSuccessfulTerminalPhase(tracker.getPhase())); + } + + /** + * Creates a list of work orders, corresponding to each worker, for a particular stageNumber + */ + public Int2ObjectMap<WorkOrder> createWorkOrders( + final int stageNumber, + @Nullable final Int2ObjectMap<Object> extraInfos + ) + { + final Int2ObjectMap<WorkOrder> retVal = new Int2ObjectAVLTreeMap<>(); + final ControllerStageTracker stageKernel = getStageKernelOrThrow(getStageId(stageNumber)); + + final WorkerInputs workerInputs = stageKernel.getWorkerInputs(); + for (int workerNumber : workerInputs.workers()) { + final Object extraInfo = extraInfos != null ? extraInfos.get(workerNumber) : null; + + //noinspection unchecked + final ExtraInfoHolder<?> extraInfoHolder = + stageKernel.getStageDefinition().getProcessorFactory().makeExtraInfoHolder(extraInfo); + + final WorkOrder workOrder = new WorkOrder( + queryDef, + stageNumber, + workerNumber, + workerInputs.inputsForWorker(workerNumber), + extraInfoHolder + ); + + QueryValidator.validateWorkOrder(workOrder); + retVal.put(workerNumber, workOrder); + } + + return retVal; + } + + private void createNewKernels( + final Int2IntMap stageWorkerCountMap, + final InputSpecSlicer slicer, + final WorkerAssignmentStrategy assignmentStrategy + ) + { + for (final StageId nextStage : readyToRunStages) { + // Create a tracker. + final StageDefinition stageDef = queryDef.getStageDefinition(nextStage); + final ControllerStageTracker stageKernel = ControllerStageTracker.create( + stageDef, + stageWorkerCountMap, + slicer, + assignmentStrategy + ); + stageTracker.put(nextStage, stageKernel); + } + + readyToRunStages.clear(); + } + + /** + * Populates the inflowMap, outflowMap and pending inflow/outflow maps corresponding to the query definition + */ + private void initializeStageDAGMaps() + { + initializeStageOutflowMap(this.queryDef); + initializeStageInflowMap(this.queryDef); + } + + /** + * Initializes this.outflowMap with a mapping of stage -> stages that depend on that stage. + */ + private void initializeStageOutflowMap(final QueryDefinition queryDefinition) + { + Preconditions.checkArgument(this.outflowMap == null, "outflow map must only be built once"); + final Map<StageId, Set<StageId>> retVal = new HashMap<>(); + this.pendingOutflowMap = new HashMap<>(); + for (final StageDefinition stageDef : queryDefinition.getStageDefinitions()) { + final StageId stageId = stageDef.getId(); + retVal.computeIfAbsent(stageId, ignored -> new HashSet<>()); + this.pendingOutflowMap.computeIfAbsent(stageId, ignored -> new HashSet<>()); + for (final int inputStageNumber : queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) { + final StageId inputStageId = new StageId(queryDef.getQueryId(), inputStageNumber); + retVal.computeIfAbsent(inputStageId, ignored -> new HashSet<>()).add(stageId); + this.pendingOutflowMap.computeIfAbsent(inputStageId, ignored -> new HashSet<>()).add(stageId); + } + } + this.outflowMap = Collections.unmodifiableMap(retVal); + } + + /** + * Initializes this.inflowMap with a mapping of stage -> stages that flow *into* that stage. + */ + private void initializeStageInflowMap(final QueryDefinition queryDefinition) + { + final Map<StageId, Set<StageId>> retVal = new HashMap<>(); + this.pendingInflowMap = new HashMap<>(); + for (final StageDefinition stageDef : queryDefinition.getStageDefinitions()) { + final StageId stageId = stageDef.getId(); + retVal.computeIfAbsent(stageId, ignored -> new HashSet<>()); + this.pendingInflowMap.computeIfAbsent(stageId, ignored -> new HashSet<>()); + for (final int inputStageNumber : queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) { + final StageId inputStageId = new StageId(queryDef.getQueryId(), inputStageNumber); + retVal.computeIfAbsent(stageId, ignored -> new HashSet<>()).add(inputStageId); + this.pendingInflowMap.computeIfAbsent(stageId, ignored -> new HashSet<>()).add(inputStageId); + } + } + this.inflowMap = Collections.unmodifiableMap(retVal); + } + + /** + * Adds stageIds for those stages which donot require any input from any other stages Review Comment: Nit "donot" -> "do not" ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Key; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.guice.MultiStageQuery; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault; +import org.apache.druid.msq.indexing.error.InsertTimeNullFault; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQFault; +import org.apache.druid.msq.indexing.error.UnknownFault; +import org.apache.druid.msq.indexing.error.WorkerFailedFault; +import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; +import org.apache.druid.msq.statistics.KeyCollectorFactory; +import org.apache.druid.msq.statistics.KeyCollectorSnapshot; +import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule; +import org.apache.druid.msq.statistics.KeyCollectors; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.server.DruidNode; +import org.apache.druid.storage.StorageConnector; + +import javax.annotation.Nullable; +import java.util.UUID; + +public class MSQTasks +{ + /** + * Message used by {@link #makeErrorReport} when no other message is known. + */ + static final String GENERIC_QUERY_FAILED_MESSAGE = "Query failed"; + + private static final String TASK_ID_PREFIX = "query-"; + + /** + * Returns a controller task ID given a + */ + public static String controllerTaskId(@Nullable final String queryId) + { + return TASK_ID_PREFIX + (queryId == null ? UUID.randomUUID().toString() : queryId); + } + + /** + * Returns a controller task ID given a Review Comment: Nit: "given a"... what? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.msq.querykit.LazyResourceHolder; +import org.apache.druid.msq.rpc.CoordinatorServiceClient; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.loading.SegmentCacheManager; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; + +import java.io.File; +import java.io.IOException; + +/** + * Production implementation of {@link DataSegmentProvider} using Coordinator APIs. + */ +public class TaskDataSegmentProvider implements DataSegmentProvider +{ + private final CoordinatorServiceClient coordinatorClient; + private final SegmentCacheManager segmentCacheManager; + private final IndexIO indexIO; + + public TaskDataSegmentProvider( + CoordinatorServiceClient coordinatorClient, + SegmentCacheManager segmentCacheManager, + IndexIO indexIO + ) + { + this.coordinatorClient = coordinatorClient; + this.segmentCacheManager = segmentCacheManager; + this.indexIO = indexIO; + } + + @Override + public LazyResourceHolder<Segment> fetchSegment( + final SegmentId segmentId, + final ChannelCounters channelCounters + ) + { + try { + // Use LazyResourceHolder so Coordinator call and segment downloads happen in processing threads, + // rather than the main thread. + return new LazyResourceHolder<>( + () -> { + try { + final DataSegment dataSegment = FutureUtils.getUnchecked( + coordinatorClient.fetchUsedSegment( + segmentId.getDataSource(), + segmentId.toString() + ), + true + ); + + final Closer closer = Closer.create(); + final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment); + closer.register(() -> FileUtils.deleteDirectory(segmentDir)); + + final QueryableIndex index = indexIO.loadIndex(segmentDir); + final int numRows = index.getNumRows(); + final long size = dataSegment.getSize(); + closer.register(() -> channelCounters.addFile(numRows, size)); + closer.register(index); + return Pair.of(new QueryableIndexSegment(index, dataSegment.getId()), closer); + } + catch (IOException | SegmentLoadingException e) { + throw new RuntimeException(e); Review Comment: Should we include in the error information about the segment that failed to download? And, differentiate between the coordinator failure and the download failure? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java: ########## @@ -0,0 +1,1230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import org.apache.druid.frame.allocation.ArenaMemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.ReadableNilFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.processor.BlockingQueueOutputChannelFactory; +import org.apache.druid.frame.processor.Bouncer; +import org.apache.druid.frame.processor.FileOutputChannelFactory; +import org.apache.druid.frame.processor.FrameChannelMuxer; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.frame.processor.SuperSorter; +import org.apache.druid.frame.processor.SuperSorterProgressTracker; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.CounterNames; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.indexing.CountingOutputChannelFactory; +import org.apache.druid.msq.indexing.InputChannelFactory; +import org.apache.druid.msq.indexing.InputChannelsImpl; +import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor; +import org.apache.druid.msq.indexing.MSQWorkerTask; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher; +import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher; +import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSlices; +import org.apache.druid.msq.input.MapInputSliceReader; +import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.msq.input.NilInputSliceReader; +import org.apache.druid.msq.input.external.ExternalInputSlice; +import org.apache.druid.msq.input.external.ExternalInputSliceReader; +import org.apache.druid.msq.input.stage.InputChannels; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.input.stage.StageInputSliceReader; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.kernel.StagePartition; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.kernel.worker.WorkerStageKernel; +import org.apache.druid.msq.kernel.worker.WorkerStagePhase; +import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory; +import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory; +import org.apache.druid.msq.shuffle.WorkerInputChannelFactory; +import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; +import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; +import org.apache.druid.msq.util.DecoratedExecutorService; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.PrioritizedCallable; +import org.apache.druid.query.PrioritizedRunnable; +import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.server.DruidNode; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Interface for a worker of a multi-stage query. + */ +public class WorkerImpl implements Worker +{ + private static final Logger log = new Logger(WorkerImpl.class); + + private final MSQWorkerTask task; + private final WorkerContext context; + + private final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue = new LinkedBlockingDeque<>(); + private final ConcurrentHashMap<StageId, ConcurrentHashMap<Integer, ReadableFrameChannel>> stageOutputs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<StageId, CounterTracker> stageCounters = new ConcurrentHashMap<>(); + private final boolean durableStageStorageEnabled; + + private volatile DruidNode selfDruidNode; + private volatile ControllerClient controllerClient; + private volatile WorkerClient workerClient; + private volatile Bouncer processorBouncer; + private volatile boolean controllerAlive = true; Review Comment: Since these are marked volatile, it means that we have to worry about concurrent reads and writes. These, however, seem fundamental. Is it true that, say, a worker starts running and these values are then concurrently set a some random time later? If so, should there be some state or locking to enforce ordering? (I.e. don't do "real work" until the fundamentals are provided.) Or, is it the case that there is an implied state machine: that these are set in one thread (say, via a work order or some such?), and only then does another thread consume them? If so, do they need to be volatile? What is the sync mechanism? Reading further, it appears these are set in `runTask` based on the context. Yet, the context is available in the constructor. Should at least the `selfDruidNode` be set in the constructor and be final? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java: ########## @@ -0,0 +1,1230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import org.apache.druid.frame.allocation.ArenaMemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.ReadableNilFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.processor.BlockingQueueOutputChannelFactory; +import org.apache.druid.frame.processor.Bouncer; +import org.apache.druid.frame.processor.FileOutputChannelFactory; +import org.apache.druid.frame.processor.FrameChannelMuxer; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.frame.processor.SuperSorter; +import org.apache.druid.frame.processor.SuperSorterProgressTracker; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.CounterNames; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.indexing.CountingOutputChannelFactory; +import org.apache.druid.msq.indexing.InputChannelFactory; +import org.apache.druid.msq.indexing.InputChannelsImpl; +import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor; +import org.apache.druid.msq.indexing.MSQWorkerTask; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher; +import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher; +import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSlices; +import org.apache.druid.msq.input.MapInputSliceReader; +import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.msq.input.NilInputSliceReader; +import org.apache.druid.msq.input.external.ExternalInputSlice; +import org.apache.druid.msq.input.external.ExternalInputSliceReader; +import org.apache.druid.msq.input.stage.InputChannels; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.input.stage.StageInputSliceReader; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.kernel.StagePartition; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.kernel.worker.WorkerStageKernel; +import org.apache.druid.msq.kernel.worker.WorkerStagePhase; +import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory; +import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory; +import org.apache.druid.msq.shuffle.WorkerInputChannelFactory; +import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; +import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; +import org.apache.druid.msq.util.DecoratedExecutorService; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.PrioritizedCallable; +import org.apache.druid.query.PrioritizedRunnable; +import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.server.DruidNode; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Interface for a worker of a multi-stage query. + */ +public class WorkerImpl implements Worker +{ + private static final Logger log = new Logger(WorkerImpl.class); + + private final MSQWorkerTask task; + private final WorkerContext context; + + private final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue = new LinkedBlockingDeque<>(); + private final ConcurrentHashMap<StageId, ConcurrentHashMap<Integer, ReadableFrameChannel>> stageOutputs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<StageId, CounterTracker> stageCounters = new ConcurrentHashMap<>(); + private final boolean durableStageStorageEnabled; + + private volatile DruidNode selfDruidNode; + private volatile ControllerClient controllerClient; + private volatile WorkerClient workerClient; + private volatile Bouncer processorBouncer; + private volatile boolean controllerAlive = true; + + public WorkerImpl(MSQWorkerTask task, WorkerContext context) + { + this.task = task; + this.context = context; + this.durableStageStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(task.getContext()); + } + + @Override + public String id() + { + return task.getId(); + } + + @Override + public MSQWorkerTask task() + { + return task; + } + + @Override + public TaskStatus run() throws Exception + { + try (final Closer closer = Closer.create()) { + Optional<MSQErrorReport> maybeErrorReport; + + try { + maybeErrorReport = runTask(closer); + } + catch (Throwable e) { + maybeErrorReport = Optional.of( + MSQErrorReport.fromException(id(), MSQTasks.getHostFromSelfNode(selfDruidNode), null, e) + ); + } + + if (maybeErrorReport.isPresent()) { + final MSQErrorReport errorReport = maybeErrorReport.get(); + final String errorLogMessage = MSQTasks.errorReportToLogMessage(errorReport); + log.warn(errorLogMessage); + + closer.register(() -> { + if (controllerAlive && controllerClient != null && selfDruidNode != null) { + controllerClient.postWorkerError(id(), errorReport); + } + }); + + return TaskStatus.failure(id(), errorReport.getFault().getCodeWithMessage()); + } else { + return TaskStatus.success(id()); + } + } + } + + /** + * Runs worker logic. Returns an empty Optional on success. On failure, returns an error report for errors that + * happened in other threads; throws exceptions for errors that happened in the main worker loop. + */ + public Optional<MSQErrorReport> runTask(final Closer closer) throws Exception + { + this.selfDruidNode = context.selfNode(); + this.controllerClient = context.makeControllerClient(task.getControllerTaskId()); + closer.register(controllerClient::close); + context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized + this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient()); + closer.register(workerClient::close); + this.processorBouncer = context.processorBouncer(); + + final KernelHolder kernelHolder = new KernelHolder(); + final String cancellationId = id(); + + final FrameProcessorExecutor workerExec = new FrameProcessorExecutor(makeProcessingPool()); + + // Delete all the stage outputs + closer.register(() -> { + for (final StageId stageId : stageOutputs.keySet()) { + cleanStageOutput(stageId); + } + }); + + // Close stage output processors and running futures (if present) + closer.register(() -> { + try { + workerExec.cancel(cancellationId); + } + catch (InterruptedException e) { + // Strange that cancellation would itself be interrupted. Throw an exception, since this is unexpected. + throw new RuntimeException(e); + } + }); + + final MSQWarningReportPublisher msqWarningReportPublisher = new MSQWarningReportLimiterPublisher( + new MSQWarningReportSimplePublisher( + id(), + controllerClient, + id(), + MSQTasks.getHostFromSelfNode(selfDruidNode) + ) + ); + + closer.register(msqWarningReportPublisher); + + final Map<StageId, SettableFuture<ClusterByPartitions>> partitionBoundariesFutureMap = new HashMap<>(); + + final Map<StageId, FrameContext> stageFrameContexts = new HashMap<>(); + + while (!kernelHolder.isDone()) { + boolean didSomething = false; + + for (final WorkerStageKernel kernel : kernelHolder.getStageKernelMap().values()) { + final StageDefinition stageDefinition = kernel.getStageDefinition(); + + if (kernel.getPhase() == WorkerStagePhase.NEW) { + log.debug("New work order: %s", context.jsonMapper().writeValueAsString(kernel.getWorkOrder())); + + // Create separate inputChannelFactory per stage, because the list of tasks can grow between stages, and + // so we need to avoid the memoization in baseInputChannelFactory. + final InputChannelFactory inputChannelFactory = makeBaseInputChannelFactory(closer); + + // Compute memory parameters for all stages, even ones that haven't been assigned yet, so we can fail-fast + // if some won't work. (We expect that all stages will get assigned to the same pool of workers.) + for (final StageDefinition stageDef : kernel.getWorkOrder().getQueryDefinition().getStageDefinitions()) { + stageFrameContexts.computeIfAbsent( + stageDef.getId(), + stageId -> context.frameContext( + kernel.getWorkOrder().getQueryDefinition(), + stageId.getStageNumber() + ) + ); + } + + // Start working on this stage immediately. + kernel.startReading(); + final SettableFuture<ClusterByPartitions> partitionBoundariesFuture = + startWorkOrder( + kernel, + inputChannelFactory, + stageCounters.computeIfAbsent(stageDefinition.getId(), ignored -> new CounterTracker()), + workerExec, + cancellationId, + context.threadCount(), + stageFrameContexts.get(stageDefinition.getId()), + msqWarningReportPublisher + ); + + if (partitionBoundariesFuture != null) { + if (partitionBoundariesFutureMap.put(stageDefinition.getId(), partitionBoundariesFuture) != null) { + throw new ISE("Work order collision for stage [%s]", stageDefinition.getId()); + } + } + + didSomething = true; + logKernelStatus(kernelHolder.getStageKernelMap().values()); + } + + if (kernel.getPhase() == WorkerStagePhase.READING_INPUT && kernel.hasResultKeyStatisticsSnapshot()) { + if (controllerAlive) { + controllerClient.postKeyStatistics( + stageDefinition.getId(), + kernel.getWorkOrder().getWorkerNumber(), + kernel.getResultKeyStatisticsSnapshot() + ); + } + kernel.startPreshuffleWaitingForResultPartitionBoundaries(); + + didSomething = true; + logKernelStatus(kernelHolder.getStageKernelMap().values()); + } + + logKernelStatus(kernelHolder.getStageKernelMap().values()); + if (kernel.getPhase() == WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES + && kernel.hasResultPartitionBoundaries()) { + partitionBoundariesFutureMap.get(stageDefinition.getId()).set(kernel.getResultPartitionBoundaries()); + kernel.startPreshuffleWritingOutput(); + + didSomething = true; + logKernelStatus(kernelHolder.getStageKernelMap().values()); + } + + if (kernel.getPhase() == WorkerStagePhase.RESULTS_READY + && kernel.addPostedResultsComplete(Pair.of(stageDefinition.getId(), kernel.getWorkOrder().getWorkerNumber()))) { + if (controllerAlive) { + controllerClient.postResultsComplete( + stageDefinition.getId(), + kernel.getWorkOrder().getWorkerNumber(), + kernel.getResultObject() + ); + } + } + + if (kernel.getPhase() == WorkerStagePhase.FAILED) { + // Better than throwing an exception, because we can include the stage number. + return Optional.of( + MSQErrorReport.fromException( + id(), + MSQTasks.getHostFromSelfNode(selfDruidNode), + stageDefinition.getId().getStageNumber(), + kernel.getException() + ) + ); + } + } + + if (!didSomething && !kernelHolder.isDone()) { + Consumer<KernelHolder> nextCommand; + + do { + postCountersToController(); + } while ((nextCommand = kernelManipulationQueue.poll(5, TimeUnit.SECONDS)) == null); + + nextCommand.accept(kernelHolder); + logKernelStatus(kernelHolder.getStageKernelMap().values()); + } + } + + // Empty means success. + return Optional.empty(); + } + + @Override + public void stopGracefully() + { + kernelManipulationQueue.add( + kernel -> { + // stopGracefully() is called when the containing process is terminated, or when the task is canceled. + throw new MSQException(CanceledFault.INSTANCE); + } + ); + } + + @Override + public void controllerFailed() + { + controllerAlive = false; + stopGracefully(); + } + + @Override + public InputStream readChannel( + final String queryId, + final int stageNumber, + final int partitionNumber, + final long offset + ) throws IOException + { + final StageId stageId = new StageId(queryId, stageNumber); + final StagePartition stagePartition = new StagePartition(stageId, partitionNumber); + final ConcurrentHashMap<Integer, ReadableFrameChannel> partitionOutputsForStage = stageOutputs.get(stageId); + + if (partitionOutputsForStage == null) { + return null; + } + final ReadableFrameChannel channel = partitionOutputsForStage.get(partitionNumber); + + if (channel == null) { + return null; + } + + if (channel instanceof ReadableNilFrameChannel) { + // Build an empty frame file. + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FrameFileWriter.open(Channels.newChannel(baos), null).close(); + + final ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray()); + + //noinspection ResultOfMethodCallIgnored: OK to ignore since "skip" always works for ByteArrayInputStream. + in.skip(offset); + + return in; + } else if (channel instanceof ReadableFileFrameChannel) { + // Close frameFile once we've returned an input stream: no need to retain a reference to the mmap after that, + // since we aren't using it. + try (final FrameFile frameFile = ((ReadableFileFrameChannel) channel).newFrameFileReference()) { + final RandomAccessFile randomAccessFile = new RandomAccessFile(frameFile.file(), "r"); + + if (offset >= randomAccessFile.length()) { + randomAccessFile.close(); + return new ByteArrayInputStream(ByteArrays.EMPTY_ARRAY); + } else { + randomAccessFile.seek(offset); + return Channels.newInputStream(randomAccessFile.getChannel()); + } + } + } else { + String errorMsg = StringUtils.format( + "Returned server error to client because channel for [%s] is not nil or file-based (class = %s)", + stagePartition, + channel.getClass().getName() + ); + log.error(StringUtils.encodeForFormat(errorMsg)); + + throw new IOException(errorMsg); + } + } + + @Override + public void postWorkOrder(final WorkOrder workOrder) + { + if (task.getWorkerNumber() != workOrder.getWorkerNumber()) { + throw new ISE("Worker number mismatch: expected [%d]", task.getWorkerNumber()); + } + + kernelManipulationQueue.add( + kernelHolder -> + kernelHolder.getStageKernelMap().computeIfAbsent( + workOrder.getStageDefinition().getId(), + ignored -> WorkerStageKernel.create(workOrder) + ) + ); + } + + @Override + public boolean postResultPartitionBoundaries( + final ClusterByPartitions stagePartitionBoundaries, + final String queryId, + final int stageNumber + ) + { + final StageId stageId = new StageId(queryId, stageNumber); + + kernelManipulationQueue.add( + kernelHolder -> { + final WorkerStageKernel stageKernel = kernelHolder.getStageKernelMap().get(stageId); + + // Ignore the update if we don't have a kernel for this stage. + if (stageKernel != null) { + stageKernel.setResultPartitionBoundaries(stagePartitionBoundaries); + } else { + log.warn("Ignored result partition boundaries call for unknown stage [%s]", stageId); + } + } + ); + return true; + } + + @Override + public void postCleanupStage(final StageId stageId) + { + log.info("Cleanup order for stage: [%s] received", stageId); + kernelManipulationQueue.add( + holder -> { + cleanStageOutput(stageId); + // Mark the stage as FINISHED + holder.getStageKernelMap().get(stageId).setStageFinished(); + } + ); + } + + @Override + public void postFinish() + { + kernelManipulationQueue.add(KernelHolder::setDone); + } + + @Override + public CounterSnapshotsTree getCounters() + { + final CounterSnapshotsTree retVal = new CounterSnapshotsTree(); + + for (final Map.Entry<StageId, CounterTracker> entry : stageCounters.entrySet()) { + retVal.put(entry.getKey().getStageNumber(), task().getWorkerNumber(), entry.getValue().snapshot()); + } + + return retVal; + } + + private InputChannelFactory makeBaseInputChannelFactory(final Closer closer) + { + final Supplier<List<String>> workerTaskList = Suppliers.memoize( + () -> { + try { + return controllerClient.getTaskList(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + )::get; + + if (durableStageStorageEnabled) { + return DurableStorageInputChannelFactory.createStandardImplementation( + task.getControllerTaskId(), + workerTaskList, + MSQTasks.makeStorageConnector(context.injector()), + closer + ); + } else { + return new WorkerOrLocalInputChannelFactory(workerTaskList); + } + } + + private OutputChannelFactory makeStageOutputChannelFactory(final FrameContext frameContext, final int stageNumber) + { + // Use the standard frame size, since we assume this size when computing how much is needed to merge output + // files from different workers. + final int frameSize = frameContext.memoryParameters().getStandardFrameSize(); + + if (durableStageStorageEnabled) { + return DurableStorageOutputChannelFactory.createStandardImplementation( + task.getControllerTaskId(), + id(), + stageNumber, + frameSize, + MSQTasks.makeStorageConnector(context.injector()) + ); + } else { + final File fileChannelDirectory = + new File(context.tempDir(), StringUtils.format("output_stage_%06d", stageNumber)); + + return new FileOutputChannelFactory(fileChannelDirectory, frameSize); + } + } + + private ListeningExecutorService makeProcessingPool() Review Comment: For us newbies, maybe add a note to explain what the processing pool is. Or, point to where this is explained. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
