gianm commented on code in PR #12918:
URL: https://github.com/apache/druid/pull/12918#discussion_r951432126


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -0,0 +1,2040 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.channel.FrameChannelSequence;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartition;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.frame.key.RowKeyReader;
+import org.apache.druid.frame.key.SortColumn;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.indexing.common.actions.LockListAction;
+import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.actions.SegmentInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.CounterSnapshots;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.indexing.ColumnMapping;
+import org.apache.druid.msq.indexing.ColumnMappings;
+import org.apache.druid.msq.indexing.DataSourceMSQDestination;
+import org.apache.druid.msq.indexing.InputChannelFactory;
+import org.apache.druid.msq.indexing.InputChannelsImpl;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
+import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.indexing.TaskReportMSQDestination;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
+import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
+import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
+import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
+import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
+import 
org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
+import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
+import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
+import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
+import org.apache.druid.msq.indexing.error.MSQWarnings;
+import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
+import org.apache.druid.msq.indexing.error.TooManyWarningsFault;
+import org.apache.druid.msq.indexing.error.UnknownFault;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQStagesReport;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerFactory;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.input.MapInputSpecSlicer;
+import org.apache.druid.msq.input.external.ExternalInputSpec;
+import org.apache.druid.msq.input.external.ExternalInputSpecSlicer;
+import org.apache.druid.msq.input.stage.InputChannels;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
+import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.input.table.TableInputSpecSlicer;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.kernel.TargetSizeShuffleSpec;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.kernel.controller.ControllerQueryKernel;
+import org.apache.druid.msq.kernel.controller.ControllerStagePhase;
+import org.apache.druid.msq.kernel.controller.WorkerInputs;
+import org.apache.druid.msq.querykit.DataSegmentTimelineView;
+import org.apache.druid.msq.querykit.MultiQueryKit;
+import org.apache.druid.msq.querykit.QueryKit;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
+import org.apache.druid.msq.querykit.scan.ScanQueryKit;
+import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
+import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
+import org.apache.druid.msq.sql.MSQTaskQueryMaker;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.util.DimensionSchemaUtils;
+import org.apache.druid.msq.util.IntervalUtils;
+import org.apache.druid.msq.util.MSQFutureUtils;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.msq.util.PassthroughAggregatorFactory;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+public class ControllerImpl implements Controller
+{
+  private static final Logger log = new Logger(ControllerImpl.class);
+
+  private final MSQControllerTask task;
+  private final ControllerContext context;
+
+  private final BlockingQueue<Consumer<ControllerQueryKernel>> 
kernelManipulationQueue =
+      new ArrayBlockingQueue<>(Limits.MAX_KERNEL_MANIPULATION_QUEUE_SIZE);
+
+  // For system error reporting. This is the very first error we got from a 
worker. (We only report that one.)
+  private final AtomicReference<MSQErrorReport> workerErrorRef = new 
AtomicReference<>();
+
+  // For system warning reporting
+  private final ConcurrentLinkedQueue<MSQErrorReport> workerWarnings = new 
ConcurrentLinkedQueue<>();
+
+  // For live reports.
+  private final AtomicReference<QueryDefinition> queryDefRef = new 
AtomicReference<>();
+
+  // For live reports. Last reported CounterSnapshots per stage per worker
+  private final CounterSnapshotsTree taskCountersForLiveReports = new 
CounterSnapshotsTree();
+
+  // For live reports. stage number -> stage phase
+  private final ConcurrentHashMap<Integer, ControllerStagePhase> 
stagePhasesForLiveReports = new ConcurrentHashMap<>();
+
+  // For live reports. stage number -> runtime interval. Endpoint is 
eternity's end if the stage is still running.
+  private final ConcurrentHashMap<Integer, Interval> 
stageRuntimesForLiveReports = new ConcurrentHashMap<>();
+
+  // For live reports. stage number -> worker count. Only set for stages that 
have started.
+  private final ConcurrentHashMap<Integer, Integer> 
stageWorkerCountsForLiveReports = new ConcurrentHashMap<>();
+
+  // For live reports. stage number -> partition count. Only set for stages 
that have started.
+  private final ConcurrentHashMap<Integer, Integer> 
stagePartitionCountsForLiveReports = new ConcurrentHashMap<>();
+
+  // For live reports. The time at which the query started
+  private volatile DateTime queryStartTime = null;
+
+  private volatile DruidNode selfDruidNode;
+  private volatile MSQWorkerTaskLauncher workerTaskLauncher;
+  private volatile WorkerClient netClient;
+
+  private volatile FaultsExceededChecker faultsExceededChecker = null;
+
+  public ControllerImpl(
+      final MSQControllerTask task,
+      final ControllerContext context
+  )
+  {
+    this.task = task;
+    this.context = context;
+  }
+
+  @Override
+  public String id()
+  {
+    return task.getId();
+  }
+
+  @Override
+  public MSQControllerTask task()
+  {
+    return task;
+  }
+
+  @Override
+  public TaskStatus run() throws Exception
+  {
+    final Closer closer = Closer.create();
+
+    try {
+      return runTask(closer);
+    }
+    catch (Throwable e) {
+      try {
+        closer.close();
+      }
+      catch (Throwable e2) {
+        e.addSuppressed(e2);
+      }
+
+      // We really don't expect this to error out. runTask should handle 
everything nicely. If it doesn't, something
+      // strange happened, so log it.
+      log.warn(e, "Encountered unhandled controller exception.");
+      return TaskStatus.failure(id(), e.toString());
+    }
+    finally {
+      closer.close();
+    }
+  }
+
+  @Override
+  public void stopGracefully()
+  {
+    final QueryDefinition queryDef = queryDefRef.get();
+
+    // stopGracefully() is called when the containing process is terminated, 
or when the task is canceled.
+    log.info("Query [%s] canceled.", queryDef != null ? queryDef.getQueryId() 
: "<no id yet>");
+
+    addToKernelManipulationQueue(
+        kernel -> {
+          throw new MSQException(CanceledFault.INSTANCE);
+        }
+    );
+  }
+
+  public TaskStatus runTask(final Closer closer)

Review Comment:
   Sounds reasonable. For now, I'll at least add comments making it clearer 
which code is involved in planning, running, and reporting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to