kfaraz commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1630655234


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)

Review Comment:
   Annotate this with `@JsonCreator`. It would also be nice to put the args on 
separate lines.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore

Review Comment:
   Why do we need `@JsonIgnore` on this?
   IIUC, `MSQCompactionRunner` is never serialized, only deserialized.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -190,6 +192,8 @@ public List<? extends Module> getJacksonModules()
         NilInputSource.class
     );
 
+    module.registerSubtypes(new NamedType(MSQCompactionRunner.class, "msq"));

Review Comment:
   The runner type should be a constant in `MSQCompactionRunner`.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(
+      CompactionTask compactionTask
+  )
+  {
+    if (compactionTask.getTuningConfig() != null) {

Review Comment:
   Please add a brief comment for each of the code blocks mentioning what is 
being validated. Example:
   
   ```java
   // Verify that partitioning type is either 'dynamic' or 'range'
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link ClientCompactionRunnerInfo}
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = 
CompactionRunner.TYPE_PROPERTY)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class)
+})
+public interface CompactionRunner
+{
+  String TYPE_PROPERTY = "type";
+
+  TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> dataSchemas,
+      TaskToolbox taskToolbox
+  ) throws Exception;
+
+  CurrentSubTaskHolder getCurrentSubTaskHolder();
+
+  /**
+   * Checks if the provided compaction config is supported by the runner.
+   *
+   * @param compactionTask
+   * @return Pair of (supported) boolean and a reason string. Reason string is 
empty if supported is true.
+   */
+  NonnullPair<Boolean, String> supportsCompactionSpec(CompactionTask 
compactionTask);

Review Comment:
   - Rename this method to `validateCompactionTask()` since the arguments have 
changed since the original set of changes in the PR.
   - Instead of a pair, return a `ValidationResult`, same as the one 
recommended for `ClientCompactionRunnerInfo`.
   - The javadoc should also link to 
`ClientCompactionRunnerInfo.validateCompactionConfig()` mentioning that the set 
of validations performed is the same.
   
   



##########
processing/src/main/java/org/apache/druid/indexer/CompactionEngine.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+
+/**
+ * Encapsulates the Engine to be used for a compaction task.
+ * Should be kept in sync with the subtypes for {@link 
org.apache.druid.indexing.common.task.CompactionRunner}.
+ */
+public enum CompactionEngine
+{
+  // Using uppercase constants since "native" is a reserved keyword.

Review Comment:
   unnecessary comment since enums are recommended to be upper-case anyway.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.QueryContext;
+
+import java.util.Objects;
+
+
+/**
+ * This class is just used to pass the strategy type via the "type" parameter 
for deserilization to appropriate
+ * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at 
the overlod.
+ */
+public class ClientCompactionRunnerInfo
+{
+  private final CompactionEngine type;
+
+  @JsonCreator
+  public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine 
type)
+  {
+    this.type = type;
+  }
+
+  @JsonProperty
+  public CompactionEngine getType()
+  {
+    return type;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionRunnerInfo{" +
+           "type=" + type +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o;
+    return type == that.type;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(type);
+  }
+
+  /**
+   * Checks if the provided compaction config is supported by the runner
+   * @param newConfig The updated compaction config
+   * @param engineSource String indicating the source of compaction engine.
+   * @return Pair of support boolean and reason string. The reason string is 
empty if support boolean is True.
+   */
+  public static NonnullPair<Boolean, String> 
supportsCompactionConfig(DataSourceCompactionConfig newConfig, String 
engineSource)
+  {
+    CompactionEngine compactionEngine = newConfig.getEngine();
+    if (compactionEngine == CompactionEngine.MSQ) {

Review Comment:
   It would be easier to read this method if the content specific to MSQ was 
moved to a separate method.
   
   ```java
   if (compactionEngine == CompactionEngine.NATIVE) {
       return true;
   } else {
       return msqEngineSupportsCompactionConfig();
   }
   ```
   
   The new method should also list out the cases which are not supported in the 
javadoc.



##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -58,15 +61,19 @@ public class CompactionState
   // org.apache.druid.query.aggregation.AggregatorFactory cannot be used here 
because it's in the 'processing' module which
   // has a dependency on the 'core' module where this class is.
   private final List<Object> metricsSpec;
+  private final CompactionEngine engine;
+  private final Map<String, AggregatorFactory> 
dimensionToAggregatoryFactoryMap;
 
   @JsonCreator
   public CompactionState(
       @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
       @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
+      @JsonProperty("dimensionToAggregatoryFactoryMap") Map<String, 
AggregatorFactory> dimensionToAggregatoryFactoryMap,

Review Comment:
   Please annotate the new fields in the constructor and their getters as 
`@Nullable`.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(
+      CompactionTask compactionTask
+  )
+  {
+    if (compactionTask.getTuningConfig() != null) {
+      PartitionsSpec partitionsSpec = 
compactionTask.getTuningConfig().getPartitionsSpec();

Review Comment:
   It should be possible to re-use the code in `ClientCompactionRunnerInfo` 
since the MSQ extension depends on the `server` module.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(
+      CompactionTask compactionTask
+  )
+  {
+    if (compactionTask.getTuningConfig() != null) {
+      PartitionsSpec partitionsSpec = 
compactionTask.getTuningConfig().getPartitionsSpec();
+      if (!(partitionsSpec instanceof DynamicPartitionsSpec
+            || partitionsSpec instanceof DimensionRangePartitionsSpec)) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "Invalid partition spec type[%s] for MSQ compaction engine."
+            + " Type must be either DynamicPartitionsSpec or 
DynamicRangePartitionsSpec.",
+            partitionsSpec.getClass()
+        ));
+      }
+      if (partitionsSpec instanceof DynamicPartitionsSpec
+          && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != 
null) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ 
compaction engine.",
+            ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+        ));
+      }
+    }
+    if (compactionTask.getMetricsSpec() != null
+        && compactionTask.getGranularitySpec() != null
+        && !compactionTask.getGranularitySpec()
+                          .isRollup()) {
+      return new NonnullPair<>(
+          false,
+          "rollup in granularitySpec must be set to true if metricsSpec is 
specifed "
+          + "for MSQ compaction engine."
+      );
+    }
+
+    QueryContext compactionTaskContext = new 
QueryContext(compactionTask.getContext());
+    if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) 
{
+      return new NonnullPair<>(false, StringUtils.format(
+          "Config[%s] cannot be set to false for auto-compaction with MSQ 
engine.",
+          MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS
+      ));
+    }
+    if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == 
WorkerAssignmentStrategy.AUTO) {
+      return new NonnullPair<>(
+          false,
+          StringUtils.format(
+              "Config[%s] cannot be set to value[%s] for auto-compaction with 
MSQ engine.",
+              MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
+              WorkerAssignmentStrategy.AUTO
+          )
+      );
+    }
+    return new NonnullPair<>(true, "");
+  }
+
+  @Override
+  public CurrentSubTaskHolder getCurrentSubTaskHolder()
+  {
+    return currentSubTaskHolder;
+  }
+
+  @Override
+  public TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> intervalDataSchemas,
+      TaskToolbox taskToolbox
+  ) throws Exception
+  {
+    List<MSQControllerTask> msqControllerTasks = 
compactionToMSQTasks(compactionTask, intervalDataSchemas);
+
+    if (msqControllerTasks.isEmpty()) {
+      String msg = StringUtils.format(
+          "Can't find segments from inputSpec[%s], nothing to do.",
+          compactionTask.getIoConfig().getInputSpec()
+      );
+      return TaskStatus.failure(compactionTask.getId(), msg);
+    }
+    return runSubtasks(
+        msqControllerTasks,
+        taskToolbox,
+        currentSubTaskHolder,
+        compactionTask.getId()
+    );
+  }
+
+  public List<MSQControllerTask> compactionToMSQTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> intervalDataSchemas
+  ) throws JsonProcessingException
+  {
+    List<MSQControllerTask> msqControllerTasks = new ArrayList<>();

Review Comment:
   Nit:
   ```suggestion
       final List<MSQControllerTask> msqControllerTasks = new ArrayList<>();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link ClientCompactionRunnerInfo}
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = 
CompactionRunner.TYPE_PROPERTY)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class)
+})
+public interface CompactionRunner
+{
+  String TYPE_PROPERTY = "type";
+
+  TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> dataSchemas,
+      TaskToolbox taskToolbox
+  ) throws Exception;
+
+  CurrentSubTaskHolder getCurrentSubTaskHolder();
+
+  /**
+   * Checks if the provided compaction config is supported by the runner.
+   *
+   * @param compactionTask

Review Comment:
   Not needed.
   
   ```suggestion
   ```



##########
server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java:
##########
@@ -119,7 +119,8 @@ public Response addOrUpdateCompactionConfig(
           .getCompactionConfigs()
           .stream()
           .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, 
Function.identity()));
-      newConfigs.put(newConfig.getDataSource(), newConfig);
+      DataSourceCompactionConfig updatedConfig = 
DataSourceCompactionConfig.from(newConfig, current.getEngine());
+      newConfigs.put(updatedConfig.getDataSource(), updatedConfig);

Review Comment:
   Use the `newConfig` instead of the `updatedConfig` to persist.
   If the `newConfig` (i.e. the API payload posted by the user) has no engine 
set, we would have set the engine to the default value in `updatedConfig`.
   
   When we want to switch the engine at the cluster level, we would have to 
update each config individually.



##########
docs/multi-stage-query/known-issues.md:
##########
@@ -68,3 +68,8 @@ properties, and the `indexSpec` 
[`tuningConfig`](../ingestion/ingestion-spec.md#
 - The maximum number of elements in a window cannot exceed a value of 100,000. 
 - To avoid `leafOperators` in MSQ engine, window functions have an extra scan 
stage after the window stage for cases 
 where native engine has a non-empty `leafOperator`.
+
+## Compaction
+
+- Compaction configured with MSQ engine currently doesn't support context 
flags `finalizeAggregations=false` and
+  `taskAssignment=auto`.

Review Comment:
   This should not be here as this is only an experimental feature right now. 
There should be a heading in the compaction page giving a summary of 
Auto-compaction using MSQ, how it can be enabled/disabled and what are its 
current shortcomings.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link ClientCompactionRunnerInfo}
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = 
CompactionRunner.TYPE_PROPERTY)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class)
+})
+public interface CompactionRunner
+{
+  String TYPE_PROPERTY = "type";
+
+  TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> dataSchemas,

Review Comment:
   Why a list, why not just a Map from Interval to DataSchema?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link ClientCompactionRunnerInfo}
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = 
CompactionRunner.TYPE_PROPERTY)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class)
+})
+public interface CompactionRunner
+{
+  String TYPE_PROPERTY = "type";
+
+  TaskStatus runCompactionTasks(

Review Comment:
   A small javadoc here would be nice.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.QueryContext;
+
+import java.util.Objects;
+
+
+/**
+ * This class is just used to pass the strategy type via the "type" parameter 
for deserilization to appropriate
+ * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at 
the overlod.
+ */
+public class ClientCompactionRunnerInfo
+{
+  private final CompactionEngine type;
+
+  @JsonCreator
+  public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine 
type)
+  {
+    this.type = type;
+  }
+
+  @JsonProperty
+  public CompactionEngine getType()
+  {
+    return type;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionRunnerInfo{" +
+           "type=" + type +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o;
+    return type == that.type;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(type);
+  }
+
+  /**
+   * Checks if the provided compaction config is supported by the runner
+   * @param newConfig The updated compaction config
+   * @param engineSource String indicating the source of compaction engine.
+   * @return Pair of support boolean and reason string. The reason string is 
empty if support boolean is True.
+   */
+  public static NonnullPair<Boolean, String> 
supportsCompactionConfig(DataSourceCompactionConfig newConfig, String 
engineSource)

Review Comment:
   1. Please put arguments on separate lines.
   2. Rename the method to `validateCompactionConfig`
   3. Return a `ValidationResult` that has a `boolean isValid` and a 
`List<String> errorMessages`. A pair is often difficult to interpret for the 
caller and makes sense only when there is a clearly defined LHS and RHS, e.g. 
in an equation or expression evaluation.
   4. The `engineSource` should not be passed here. It can either be left out 
altogether or be used in the exception constructed in 
`DataSourceCompactionConfig.from()`.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.QueryContext;
+
+import java.util.Objects;
+
+
+/**
+ * This class is just used to pass the strategy type via the "type" parameter 
for deserilization to appropriate
+ * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at 
the overlod.
+ */
+public class ClientCompactionRunnerInfo
+{
+  private final CompactionEngine type;
+
+  @JsonCreator
+  public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine 
type)
+  {
+    this.type = type;
+  }
+
+  @JsonProperty
+  public CompactionEngine getType()
+  {
+    return type;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionRunnerInfo{" +
+           "type=" + type +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o;
+    return type == that.type;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(type);
+  }
+
+  /**
+   * Checks if the provided compaction config is supported by the runner
+   * @param newConfig The updated compaction config
+   * @param engineSource String indicating the source of compaction engine.
+   * @return Pair of support boolean and reason string. The reason string is 
empty if support boolean is True.
+   */
+  public static NonnullPair<Boolean, String> 
supportsCompactionConfig(DataSourceCompactionConfig newConfig, String 
engineSource)
+  {
+    CompactionEngine compactionEngine = newConfig.getEngine();
+    if (compactionEngine == CompactionEngine.MSQ) {
+      if (newConfig.getTuningConfig() != null) {

Review Comment:
   Please add a one-line comment before each type of validation.
   
   For example,
   ```java
   // Partitioning must either by 'dynamic' or 'range'
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(

Review Comment:
   Please add a javadoc to this method listing out the cases which are 
currently not supported.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(
+      CompactionTask compactionTask
+  )
+  {
+    if (compactionTask.getTuningConfig() != null) {
+      PartitionsSpec partitionsSpec = 
compactionTask.getTuningConfig().getPartitionsSpec();
+      if (!(partitionsSpec instanceof DynamicPartitionsSpec
+            || partitionsSpec instanceof DimensionRangePartitionsSpec)) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "Invalid partition spec type[%s] for MSQ compaction engine."
+            + " Type must be either DynamicPartitionsSpec or 
DynamicRangePartitionsSpec.",
+            partitionsSpec.getClass()
+        ));
+      }
+      if (partitionsSpec instanceof DynamicPartitionsSpec
+          && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != 
null) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ 
compaction engine.",
+            ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+        ));
+      }
+    }
+    if (compactionTask.getMetricsSpec() != null
+        && compactionTask.getGranularitySpec() != null
+        && !compactionTask.getGranularitySpec()
+                          .isRollup()) {
+      return new NonnullPair<>(
+          false,
+          "rollup in granularitySpec must be set to true if metricsSpec is 
specifed "
+          + "for MSQ compaction engine."
+      );
+    }
+
+    QueryContext compactionTaskContext = new 
QueryContext(compactionTask.getContext());
+    if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) 
{
+      return new NonnullPair<>(false, StringUtils.format(
+          "Config[%s] cannot be set to false for auto-compaction with MSQ 
engine.",
+          MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS
+      ));
+    }
+    if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == 
WorkerAssignmentStrategy.AUTO) {
+      return new NonnullPair<>(
+          false,
+          StringUtils.format(
+              "Config[%s] cannot be set to value[%s] for auto-compaction with 
MSQ engine.",
+              MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
+              WorkerAssignmentStrategy.AUTO
+          )
+      );
+    }
+    return new NonnullPair<>(true, "");
+  }
+
+  @Override
+  public CurrentSubTaskHolder getCurrentSubTaskHolder()
+  {
+    return currentSubTaskHolder;
+  }
+
+  @Override
+  public TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> intervalDataSchemas,
+      TaskToolbox taskToolbox
+  ) throws Exception
+  {
+    List<MSQControllerTask> msqControllerTasks = 
compactionToMSQTasks(compactionTask, intervalDataSchemas);
+
+    if (msqControllerTasks.isEmpty()) {
+      String msg = StringUtils.format(
+          "Can't find segments from inputSpec[%s], nothing to do.",
+          compactionTask.getIoConfig().getInputSpec()
+      );
+      return TaskStatus.failure(compactionTask.getId(), msg);
+    }
+    return runSubtasks(
+        msqControllerTasks,
+        taskToolbox,
+        currentSubTaskHolder,
+        compactionTask.getId()
+    );
+  }
+
+  public List<MSQControllerTask> compactionToMSQTasks(

Review Comment:
   ```suggestion
     public List<MSQControllerTask> createMsqControllerTasks(
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java:
##########
@@ -307,9 +327,16 @@ private CompactionStatus metricsSpecIsUpToDate()
       }
 
       final List<Object> metricSpecList = lastCompactionState.getMetricsSpec();
-      final AggregatorFactory[] existingMetricsSpec
-          = CollectionUtils.isNullOrEmpty(metricSpecList)
-            ? null : objectMapper.convertValue(metricSpecList, 
AggregatorFactory[].class);
+      final AggregatorFactory[] existingMetricsSpec;
+      if (CollectionUtils.isNullOrEmpty(metricSpecList)) {

Review Comment:
   please add a comment here explaining what is being done.



##########
server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java:
##########
@@ -119,7 +119,8 @@ public Response addOrUpdateCompactionConfig(
           .getCompactionConfigs()
           .stream()
           .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, 
Function.identity()));
-      newConfigs.put(newConfig.getDataSource(), newConfig);
+      DataSourceCompactionConfig updatedConfig = 
DataSourceCompactionConfig.from(newConfig, current.getEngine());

Review Comment:
   I don't think we need this new method `DataSourceCompactionConfig.from()`.
   Simply call `ClientCompactionRunnerInfo.validateCompactionConfig()` from 
here and use the validation result to check if an exception needs to be thrown.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.CompactionRunner;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.TimestampFloorExprMacro;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class MSQCompactionRunner implements CompactionRunner
+{
+  private static final Logger log = new Logger(MSQCompactionRunner.class);
+  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
+
+  private final ObjectMapper jsonMapper;
+  private final Injector injector;
+
+  public static final String TIME_VIRTUAL_COLUMN = "__vTime";
+  public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
+
+  @JsonIgnore
+  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
+      (taskObject, config) -> {
+        final MSQControllerTask msqControllerTask = (MSQControllerTask) 
taskObject;
+        msqControllerTask.stopGracefully(config);
+      });
+
+
+  public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, 
@JacksonInject Injector injector)
+  {
+    this.jsonMapper = jsonMapper;
+    this.injector = injector;
+  }
+
+  @Override
+  public NonnullPair<Boolean, String> supportsCompactionSpec(
+      CompactionTask compactionTask
+  )
+  {
+    if (compactionTask.getTuningConfig() != null) {
+      PartitionsSpec partitionsSpec = 
compactionTask.getTuningConfig().getPartitionsSpec();
+      if (!(partitionsSpec instanceof DynamicPartitionsSpec
+            || partitionsSpec instanceof DimensionRangePartitionsSpec)) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "Invalid partition spec type[%s] for MSQ compaction engine."
+            + " Type must be either DynamicPartitionsSpec or 
DynamicRangePartitionsSpec.",
+            partitionsSpec.getClass()
+        ));
+      }
+      if (partitionsSpec instanceof DynamicPartitionsSpec
+          && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != 
null) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ 
compaction engine.",
+            ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+        ));
+      }
+    }
+    if (compactionTask.getMetricsSpec() != null
+        && compactionTask.getGranularitySpec() != null
+        && !compactionTask.getGranularitySpec()
+                          .isRollup()) {
+      return new NonnullPair<>(
+          false,
+          "rollup in granularitySpec must be set to true if metricsSpec is 
specifed "
+          + "for MSQ compaction engine."
+      );
+    }
+
+    QueryContext compactionTaskContext = new 
QueryContext(compactionTask.getContext());
+    if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) 
{
+      return new NonnullPair<>(false, StringUtils.format(
+          "Config[%s] cannot be set to false for auto-compaction with MSQ 
engine.",
+          MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS
+      ));
+    }
+    if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == 
WorkerAssignmentStrategy.AUTO) {
+      return new NonnullPair<>(
+          false,
+          StringUtils.format(
+              "Config[%s] cannot be set to value[%s] for auto-compaction with 
MSQ engine.",
+              MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
+              WorkerAssignmentStrategy.AUTO
+          )
+      );
+    }
+    return new NonnullPair<>(true, "");
+  }
+
+  @Override
+  public CurrentSubTaskHolder getCurrentSubTaskHolder()
+  {
+    return currentSubTaskHolder;
+  }
+
+  @Override
+  public TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> intervalDataSchemas,
+      TaskToolbox taskToolbox
+  ) throws Exception
+  {
+    List<MSQControllerTask> msqControllerTasks = 
compactionToMSQTasks(compactionTask, intervalDataSchemas);
+
+    if (msqControllerTasks.isEmpty()) {
+      String msg = StringUtils.format(
+          "Can't find segments from inputSpec[%s], nothing to do.",
+          compactionTask.getIoConfig().getInputSpec()
+      );
+      return TaskStatus.failure(compactionTask.getId(), msg);
+    }
+    return runSubtasks(
+        msqControllerTasks,
+        taskToolbox,
+        currentSubTaskHolder,
+        compactionTask.getId()
+    );
+  }
+
+  public List<MSQControllerTask> compactionToMSQTasks(
+      CompactionTask compactionTask,
+      List<NonnullPair<Interval, DataSchema>> intervalDataSchemas
+  ) throws JsonProcessingException
+  {
+    List<MSQControllerTask> msqControllerTasks = new ArrayList<>();
+
+    for (NonnullPair<Interval, DataSchema> intervalDataSchema : 
intervalDataSchemas) {
+      Query<?> query;
+      Interval interval = intervalDataSchema.lhs;
+      DataSchema dataSchema = intervalDataSchema.rhs;
+
+      if (!isGroupBy(dataSchema)) {
+        query = buildScanQuery(compactionTask, interval, dataSchema);
+      } else {
+        query = buildGroupByQuery(compactionTask, interval, dataSchema);
+      }

Review Comment:
   Style: a positive condition reads better than a negative condition.
   
   ```suggestion
         if (isGroupBy(dataSchema)) {
           query = buildGroupByQuery(compactionTask, interval, dataSchema);
         } else {
           query = buildScanQuery(compactionTask, interval, dataSchema);
         }
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -34,11 +36,13 @@ public class CoordinatorCompactionConfig
   private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
   private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = 
Integer.MAX_VALUE;
   private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
+  private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = 
CompactionEngine.NATIVE;

Review Comment:
   Cluster-level default should be a config and could be exposed as a separate 
API.
   That API should validate all existing datasource compaction configs.
   
   (this change maybe done in a follow up).



##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -58,15 +61,19 @@ public class CompactionState
   // org.apache.druid.query.aggregation.AggregatorFactory cannot be used here 
because it's in the 'processing' module which
   // has a dependency on the 'core' module where this class is.
   private final List<Object> metricsSpec;
+  private final CompactionEngine engine;
+  private final Map<String, AggregatorFactory> 
dimensionToAggregatoryFactoryMap;

Review Comment:
   This field requires a javadoc explaining its purpose and usage.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.QueryContext;
+
+import java.util.Objects;
+
+
+/**
+ * This class is just used to pass the strategy type via the "type" parameter 
for deserilization to appropriate
+ * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at 
the overlod.
+ */
+public class ClientCompactionRunnerInfo
+{
+  private final CompactionEngine type;
+
+  @JsonCreator
+  public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine 
type)
+  {
+    this.type = type;
+  }
+
+  @JsonProperty
+  public CompactionEngine getType()
+  {
+    return type;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionRunnerInfo{" +
+           "type=" + type +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o;
+    return type == that.type;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(type);
+  }
+
+  /**
+   * Checks if the provided compaction config is supported by the runner
+   * @param newConfig The updated compaction config
+   * @param engineSource String indicating the source of compaction engine.
+   * @return Pair of support boolean and reason string. The reason string is 
empty if support boolean is True.
+   */
+  public static NonnullPair<Boolean, String> 
supportsCompactionConfig(DataSourceCompactionConfig newConfig, String 
engineSource)
+  {
+    CompactionEngine compactionEngine = newConfig.getEngine();
+    if (compactionEngine == CompactionEngine.MSQ) {
+      if (newConfig.getTuningConfig() != null) {
+        PartitionsSpec partitionsSpec = 
newConfig.getTuningConfig().getPartitionsSpec();
+        if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
+              || partitionsSpec instanceof DynamicPartitionsSpec)) {
+          return new NonnullPair<>(false, StringUtils.format(
+              "Invalid partition spec type[%s] for MSQ compaction engine[%s]."
+              + " Type must be either DynamicPartitionsSpec or 
DynamicRangePartitionsSpec.",
+              partitionsSpec.getClass(),
+              engineSource
+          )
+          );
+        }
+        if (partitionsSpec instanceof DynamicPartitionsSpec
+            && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != 
null) {
+          return new NonnullPair<>(false, StringUtils.format(
+              "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ 
compaction engine[%s].",
+              ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows(), 
engineSource
+          ));
+        }
+      }
+
+      if (newConfig.getMetricsSpec() != null
+          && newConfig.getGranularitySpec() != null
+          && !newConfig.getGranularitySpec()
+                       .isRollup()) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "rollup in granularitySpec must be set to True if metricsSpec is 
specifed "
+            + "for MSQ compaction engine[%s].", engineSource));
+      }
+
+      QueryContext queryContext = QueryContext.of(newConfig.getTaskContext());
+
+      if (!queryContext.getBoolean(MSQContext.CTX_FINALIZE_AGGREGATIONS, 
true)) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "Config[%s] cannot be set to false for auto-compaction with MSQ 
engine[%s].",
+            MSQContext.CTX_FINALIZE_AGGREGATIONS,
+            engineSource
+        ));
+      }
+
+      if (queryContext.getString(MSQContext.CTX_TASK_ASSIGNMENT_STRATEGY, 
MSQContext.TASK_ASSIGNMENT_STRATEGY_MAX)
+                      .equals(MSQContext.TASK_ASSIGNMENT_STRATEGY_AUTO)) {
+        return new NonnullPair<>(false, StringUtils.format(
+            "Config[%s] cannot be set to value[%s] for auto-compaction with 
MSQ engine[%s].",
+            MSQContext.CTX_TASK_ASSIGNMENT_STRATEGY,
+            MSQContext.TASK_ASSIGNMENT_STRATEGY_AUTO,
+            engineSource
+        ));
+      }
+    }
+    return new NonnullPair<>(true, "");
+  }
+
+  /**
+   * This class copies over MSQ context parameters from the MSQ extension. 
This is required to validate the submitted
+   * compaction config at the coordinator. The values used here should be kept 
in sync with those in
+   * {@link org.apache.druid.msq.util.MultiStageQueryContext}

Review Comment:
   This link is broker since `server` module does not have a dependency on 
`multi-stage-query`. Use `{@code}` tag instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to