kfaraz commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1583126810


##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -93,9 +96,8 @@ public CoordinatorCompactionConfig(
     this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
                                   DEFAULT_MAX_COMPACTION_TASK_SLOTS :
                                   maxCompactionTaskSlots;
-    this.useAutoScaleSlots = useAutoScaleSlots == null ?
-                             DEFAULT_USE_AUTO_SCALE_SLOTS :
-                             useAutoScaleSlots;
+    this.useAutoScaleSlots = useAutoScaleSlots == null ? 
DEFAULT_USE_AUTO_SCALE_SLOTS : useAutoScaleSlots;
+    this.compactionEngine = DEFAULT_COMPACTION_ENGINE;

Review Comment:
   Shouldn't there be an argument corresponding to this field in the 
constructor?



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -93,9 +96,8 @@ public CoordinatorCompactionConfig(
     this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
                                   DEFAULT_MAX_COMPACTION_TASK_SLOTS :
                                   maxCompactionTaskSlots;
-    this.useAutoScaleSlots = useAutoScaleSlots == null ?
-                             DEFAULT_USE_AUTO_SCALE_SLOTS :
-                             useAutoScaleSlots;
+    this.useAutoScaleSlots = useAutoScaleSlots == null ? 
DEFAULT_USE_AUTO_SCALE_SLOTS : useAutoScaleSlots;

Review Comment:
   ```suggestion
       this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, 
DEFAULT_USE_AUTO_SCALE_SLOTS);
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+  private final CompactionEngine type;
+
+  @JsonCreator
+  public UserCompactionStrategy(@JsonProperty("type") CompactionEngine type)
+  {
+    this.type = type;
+  }
+
+  @JsonProperty
+  public CompactionEngine getType()
+  {
+    return type;
+  }
+
+  @Override
+  public String toString()

Review Comment:
   Is this needed for logging anywhere?



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -135,13 +143,16 @@ public boolean equals(Object o)
     return Double.compare(that.compactionTaskSlotRatio, 
compactionTaskSlotRatio) == 0 &&
            maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
            useAutoScaleSlots == that.useAutoScaleSlots &&
+           compactionEngine == that.compactionEngine &&
            Objects.equals(compactionConfigs, that.compactionConfigs);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(compactionConfigs, compactionTaskSlotRatio, 
maxCompactionTaskSlots, useAutoScaleSlots);
+    return Objects.hash(compactionConfigs, compactionTaskSlotRatio, 
maxCompactionTaskSlots, useAutoScaleSlots,

Review Comment:
   Please fix formatting here by putting each argument on a separate line.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-
     // emit metric for compact ingestion mode:
     emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
 
-    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
+    final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas = 
createDataSchemasForIntervals(
         UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
-        ioConfig,
         segmentProvider,
-        partitionConfigurationManager,
         dimensionsSpec,
         transformSpec,
         metricsSpec,
         granularitySpec,
-        toolbox.getCoordinatorClient(),
-        segmentCacheManagerFactory,
         getMetricBuilder()
     );
-    final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
-        .range(0, ingestionSpecs.size())
-        .mapToObj(i -> {
-          // The ID of SubtaskSpecs is used as the base sequenceName in 
segment allocation protocol.
-          // The indexing tasks generated by the compaction task should use 
different sequenceNames
-          // so that they can allocate valid segment IDs with no duplication.
-          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
-          final String baseSequenceName = createIndexTaskSpecId(i);
-          return newTask(baseSequenceName, ingestionSpec);
-        })
-        .collect(Collectors.toList());
-
-    if (indexTaskSpecs.isEmpty()) {
-      String msg = StringUtils.format(
-          "Can't find segments from inputSpec[%s], nothing to do.",
-          ioConfig.getInputSpec()
-      );
-      log.warn(msg);
-      return TaskStatus.failure(getId(), msg);
-    } else {
-      registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
-      final int totalNumSpecs = indexTaskSpecs.size();
-      log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
-      int failCnt = 0;
-      final TaskReport.ReportMap completionReports = new 
TaskReport.ReportMap();
-      for (int i = 0; i < indexTaskSpecs.size(); i++) {
-        ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
-        final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
-        if (!currentSubTaskHolder.setTask(eachSpec)) {
-          String errMsg = "Task was asked to stop. Finish as failed.";
-          log.info(errMsg);
-          return TaskStatus.failure(getId(), errMsg);
-        }
-        try {
-          if (eachSpec.isReady(toolbox.getTaskActionClient())) {
-            log.info("Running indexSpec: " + json);
-            final TaskStatus eachResult = eachSpec.run(toolbox);
-            if (!eachResult.isSuccess()) {
-              failCnt++;
-              log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-            }
-
-            String reportKeySuffix = "_" + i;
-            Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
-                reports -> completionReports.putAll(
-                    CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)
-                )
-            );
-          } else {
-            failCnt++;
-            log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
-          }
-        }
-        catch (Exception e) {
-          failCnt++;
-          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-        }
-      }
-
-      String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] 
failed",
-                                      totalNumSpecs, totalNumSpecs - failCnt, 
failCnt
-      );
 
-      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
-      log.info(msg);
-      return failCnt == 0 ? TaskStatus.success(getId()) : 
TaskStatus.failure(getId(), msg);
+    if (compactionStrategy == null) {
+      // Can only happen for MSQ engine, when the json subtype reqd for 
deserialization isn't available due to
+      // missing extn.
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "Extension[druid-multi-stage-query] required for 
running compaction on MSQ "
+                              + "not found on the Indexer.");
     }
-  }
-
-  @VisibleForTesting
-  ParallelIndexSupervisorTask newTask(String baseSequenceName, 
ParallelIndexIngestionSpec ingestionSpec)
-  {
-    return new ParallelIndexSupervisorTask(
-        getId(),
-        getGroupId(),
-        getTaskResource(),
-        ingestionSpec,
-        baseSequenceName,
-        createContextForSubtask(),
-        true
-    );
-  }
-
-  @VisibleForTesting
-  Map<String, Object> createContextForSubtask()
-  {
-    final Map<String, Object> newContext = new HashMap<>(getContext());
-    newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
-    newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, 
STORE_COMPACTION_STATE);
-    // Set the priority of the compaction task.
-    newContext.put(Tasks.PRIORITY_KEY, getPriority());
-    return newContext;
-  }
-
-  private String createIndexTaskSpecId(int i)
-  {
-    return StringUtils.format("%s_%d", getId(), i);
+    registerResourceCloserOnAbnormalExit(currentSubTaskHolder);

Review Comment:
   ```suggestion
       
registerResourceCloserOnAbnormalExit(compactionStrategy.getCurrentSubTaskHolder());
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+  private final CompactionEngine TYPE;
+
+  @JsonCreator
+  public UserCompactionStrategy(@JsonProperty("TYPE") CompactionEngine TYPE)

Review Comment:
   ```suggestion
     public UserCompactionStrategy(@JsonProperty("type") CompactionEngine type)
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy

Review Comment:
   Yes, I agree.
   
   This should have the prefix `Client` rather than `User`.
   So, a meaningful name could be `ClientCompactionRunnerInfo` since it is 
related to the runner but is not an actual runner itself.
   Please add a javadoc clarifying that this class is only needed for serde and 
would be deserialized on the Overlord side as a `CompactionRunner`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link 
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = CompactionStrategy.TYPE_PROPERTY,
+    visible = true)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value = 
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy

Review Comment:
   This name is confusing as it is difficult to distinguish from 
`CompactionPolicy`. This class is more of a `CompactionRunner`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link 
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = CompactionStrategy.TYPE_PROPERTY,
+    visible = true)

Review Comment:
   A simpler version of this should work too:
   ```suggestion
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
   ```
   
   For verification, see the usage in `Task` interface. `Task` also has a 
`getType()` method which doesn't seem to interfere with the 
serialization/deserialization.



##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+  private final CompactionEngine TYPE;

Review Comment:
   ```suggestion
     private final CompactionEngine type;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link 
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = CompactionStrategy.TYPE_PROPERTY,
+    visible = true)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value = 
NativeCompactionStrategy.class)

Review Comment:
   Probably best to keep the type name constants in a single place, i.e. 
`CompactionEngine`.



##########
server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java:
##########
@@ -192,6 +207,7 @@ public boolean equals(Object o)
            Arrays.equals(metricsSpec, that.metricsSpec) &&
            Objects.equals(transformSpec, that.transformSpec) &&
            Objects.equals(ioConfig, that.ioConfig) &&
+           Objects.equals(compactionEngine, that.compactionEngine) &&

Review Comment:
   this should work too as this is a plain enum field
   ```suggestion
              this.compactionEngine == that.compactionEngine &&
   ```



##########
processing/src/main/java/org/apache/druid/indexer/CompactionEngine.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Encapsulates the Engine to be used for a compaction task.
+ * Should be synchronized with the subtypes for {@link 
org.apache.druid.indexing.common.task.CompactionStrategy}.
+ */
+public enum CompactionEngine
+{
+  NATIVE,
+  MSQ;
+
+  @JsonCreator
+  public static CompactionEngine fromString(String name)
+  {
+    if (name == null) {
+      return null;
+    }
+    return valueOf(StringUtils.toUpperCase(name));

Review Comment:
   ```suggestion
       return name == null ? null : valueOf(StringUtils.toUpperCase(name));
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-
     // emit metric for compact ingestion mode:
     emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
 
-    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
+    final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas = 
createDataSchemasForIntervals(
         UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
-        ioConfig,
         segmentProvider,
-        partitionConfigurationManager,
         dimensionsSpec,
         transformSpec,
         metricsSpec,
         granularitySpec,
-        toolbox.getCoordinatorClient(),
-        segmentCacheManagerFactory,
         getMetricBuilder()
     );
-    final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
-        .range(0, ingestionSpecs.size())
-        .mapToObj(i -> {
-          // The ID of SubtaskSpecs is used as the base sequenceName in 
segment allocation protocol.
-          // The indexing tasks generated by the compaction task should use 
different sequenceNames
-          // so that they can allocate valid segment IDs with no duplication.
-          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
-          final String baseSequenceName = createIndexTaskSpecId(i);
-          return newTask(baseSequenceName, ingestionSpec);
-        })
-        .collect(Collectors.toList());
-
-    if (indexTaskSpecs.isEmpty()) {
-      String msg = StringUtils.format(
-          "Can't find segments from inputSpec[%s], nothing to do.",
-          ioConfig.getInputSpec()
-      );
-      log.warn(msg);
-      return TaskStatus.failure(getId(), msg);
-    } else {
-      registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
-      final int totalNumSpecs = indexTaskSpecs.size();
-      log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
-      int failCnt = 0;
-      final TaskReport.ReportMap completionReports = new 
TaskReport.ReportMap();
-      for (int i = 0; i < indexTaskSpecs.size(); i++) {
-        ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
-        final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
-        if (!currentSubTaskHolder.setTask(eachSpec)) {
-          String errMsg = "Task was asked to stop. Finish as failed.";
-          log.info(errMsg);
-          return TaskStatus.failure(getId(), errMsg);
-        }
-        try {
-          if (eachSpec.isReady(toolbox.getTaskActionClient())) {
-            log.info("Running indexSpec: " + json);
-            final TaskStatus eachResult = eachSpec.run(toolbox);
-            if (!eachResult.isSuccess()) {
-              failCnt++;
-              log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-            }
-
-            String reportKeySuffix = "_" + i;
-            Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
-                reports -> completionReports.putAll(
-                    CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)
-                )
-            );
-          } else {
-            failCnt++;
-            log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
-          }
-        }
-        catch (Exception e) {
-          failCnt++;
-          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-        }
-      }
-
-      String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] 
failed",
-                                      totalNumSpecs, totalNumSpecs - failCnt, 
failCnt
-      );
 
-      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
-      log.info(msg);
-      return failCnt == 0 ? TaskStatus.success(getId()) : 
TaskStatus.failure(getId(), msg);
+    if (compactionStrategy == null) {
+      // Can only happen for MSQ engine, when the json subtype reqd for 
deserialization isn't available due to

Review Comment:
   I don't think this is true. Is MSQ extension is not present, then a 
compaction task with engine as "msq" will throw a JsonException while being 
submitted to the Overlord itself.



##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+  private final CompactionEngine TYPE;
+
+  @JsonCreator
+  public UserCompactionStrategy(@JsonProperty("TYPE") CompactionEngine TYPE)
+  {
+    this.TYPE = TYPE;
+  }
+
+  @JsonProperty("TYPE")

Review Comment:
   ```suggestion
     @JsonProperty("type")
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -164,21 +157,17 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
   private final ClientCompactionTaskGranularitySpec granularitySpec;
   @Nullable
   private final CompactionTuningConfig tuningConfig;
+  @Nullable
+  private final CompactionStrategy compactionStrategy;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
   @JsonIgnore
   private final PartitionConfigurationManager partitionConfigurationManager;
-
   @JsonIgnore
   private final SegmentCacheManagerFactory segmentCacheManagerFactory;
 
   @JsonIgnore
-  private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
-      (taskObject, config) -> {
-        final ParallelIndexSupervisorTask indexTask = 
(ParallelIndexSupervisorTask) taskObject;
-        indexTask.stopGracefully(config);
-      }
-  );
+  private final CurrentSubTaskHolder currentSubTaskHolder;

Review Comment:
   This probably doesn't need to be a field anymore.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-
     // emit metric for compact ingestion mode:
     emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
 
-    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
+    final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas = 
createDataSchemasForIntervals(
         UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
-        ioConfig,
         segmentProvider,
-        partitionConfigurationManager,
         dimensionsSpec,
         transformSpec,
         metricsSpec,
         granularitySpec,
-        toolbox.getCoordinatorClient(),
-        segmentCacheManagerFactory,
         getMetricBuilder()
     );
-    final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
-        .range(0, ingestionSpecs.size())
-        .mapToObj(i -> {
-          // The ID of SubtaskSpecs is used as the base sequenceName in 
segment allocation protocol.
-          // The indexing tasks generated by the compaction task should use 
different sequenceNames
-          // so that they can allocate valid segment IDs with no duplication.
-          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
-          final String baseSequenceName = createIndexTaskSpecId(i);
-          return newTask(baseSequenceName, ingestionSpec);
-        })
-        .collect(Collectors.toList());
-
-    if (indexTaskSpecs.isEmpty()) {
-      String msg = StringUtils.format(
-          "Can't find segments from inputSpec[%s], nothing to do.",
-          ioConfig.getInputSpec()
-      );
-      log.warn(msg);
-      return TaskStatus.failure(getId(), msg);
-    } else {
-      registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
-      final int totalNumSpecs = indexTaskSpecs.size();
-      log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
-      int failCnt = 0;
-      final TaskReport.ReportMap completionReports = new 
TaskReport.ReportMap();
-      for (int i = 0; i < indexTaskSpecs.size(); i++) {
-        ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
-        final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
-        if (!currentSubTaskHolder.setTask(eachSpec)) {
-          String errMsg = "Task was asked to stop. Finish as failed.";
-          log.info(errMsg);
-          return TaskStatus.failure(getId(), errMsg);
-        }
-        try {
-          if (eachSpec.isReady(toolbox.getTaskActionClient())) {
-            log.info("Running indexSpec: " + json);
-            final TaskStatus eachResult = eachSpec.run(toolbox);
-            if (!eachResult.isSuccess()) {
-              failCnt++;
-              log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-            }
-
-            String reportKeySuffix = "_" + i;
-            Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
-                reports -> completionReports.putAll(
-                    CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)
-                )
-            );
-          } else {
-            failCnt++;
-            log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
-          }
-        }
-        catch (Exception e) {
-          failCnt++;
-          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-        }
-      }
-
-      String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] 
failed",
-                                      totalNumSpecs, totalNumSpecs - failCnt, 
failCnt
-      );
 
-      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
-      log.info(msg);
-      return failCnt == 0 ? TaskStatus.success(getId()) : 
TaskStatus.failure(getId(), msg);
+    if (compactionStrategy == null) {
+      // Can only happen for MSQ engine, when the json subtype reqd for 
deserialization isn't available due to
+      // missing extn.
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)

Review Comment:
   Can be an `InvalidInput.exception` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -357,6 +349,16 @@ public DimensionsSpec getDimensionsSpec()
     return dimensionsSpec;
   }
 
+  public PartitionConfigurationManager getPartitionConfigurationManager()
+  {
+    return partitionConfigurationManager;
+  }
+
+  public SegmentCacheManagerFactory getSegmentCacheManagerFactory()
+  {
+    return segmentCacheManagerFactory;
+  }

Review Comment:
   These are not needed here anymore. They should only be present in the 
`NativeCompactionStrategy`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link 
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = CompactionStrategy.TYPE_PROPERTY,
+    visible = true)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value = 
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy
+{
+  String TYPE_PROPERTY = "TYPE";
+
+  TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      TaskToolbox taskToolbox,
+      List<NonnullPair<Interval, DataSchema>> dataSchemas
+  ) throws JsonProcessingException;

Review Comment:
   ```suggestion
     ) throws Exception;
   ```
   
   Since this is a task run method, it can throw all exceptions that 
`Task.run()` can.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionStrategy.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.utils.CollectionUtils;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NativeCompactionStrategy implements CompactionStrategy
+{
+  private static final Logger log = new Logger(NativeCompactionStrategy.class);
+  public static final String TYPE = "NATIVE";
+  private static final boolean STORE_COMPACTION_STATE = true;
+
+  @JsonCreator
+  public NativeCompactionStrategy()
+  {
+  }
+
+  @Override
+  public CurrentSubTaskHolder getCurrentSubTaskHolder()
+  {
+    return new CurrentSubTaskHolder(
+        (taskObject, config) -> {
+          final ParallelIndexSupervisorTask indexTask = 
(ParallelIndexSupervisorTask) taskObject;
+          indexTask.stopGracefully(config);
+        }
+    );
+  }
+
+  @Override
+  @JsonProperty("TYPE")

Review Comment:
   `@JsonProperty` is probably not be needed. See the other comments and the 
examples of `Task` implementations.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link 
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = CompactionStrategy.TYPE_PROPERTY,
+    visible = true)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value = 
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy
+{
+  String TYPE_PROPERTY = "TYPE";
+
+  TaskStatus runCompactionTasks(
+      CompactionTask compactionTask,
+      TaskToolbox taskToolbox,
+      List<NonnullPair<Interval, DataSchema>> dataSchemas
+  ) throws JsonProcessingException;
+
+  CurrentSubTaskHolder getCurrentSubTaskHolder();

Review Comment:
   Yes, that's a good point. But I think the validation should also happen on 
the coordinator side when submitting a new compaction config.
   
   I guess one way to do that would be to have `ClientCompactionRunnerInfo` 
expose a `supportsCompactionConfig(DatasourceCompactionConfig config)` method. 
@gargvishesh , are we doing any compatibility validation in the current set of 
changes?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionStrategy.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.utils.CollectionUtils;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NativeCompactionStrategy implements CompactionStrategy
+{
+  private static final Logger log = new Logger(NativeCompactionStrategy.class);
+  public static final String TYPE = "NATIVE";
+  private static final boolean STORE_COMPACTION_STATE = true;
+
+  @JsonCreator
+  public NativeCompactionStrategy()
+  {
+  }
+
+  @Override
+  public CurrentSubTaskHolder getCurrentSubTaskHolder()
+  {
+    return new CurrentSubTaskHolder(
+        (taskObject, config) -> {
+          final ParallelIndexSupervisorTask indexTask = 
(ParallelIndexSupervisorTask) taskObject;
+          indexTask.stopGracefully(config);
+        }
+    );
+  }
+
+  @Override
+  @JsonProperty("TYPE")
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  /**
+   * Generate {@link ParallelIndexIngestionSpec} from input dataschemas.
+   *
+   * @return an empty list if input segments don't exist. Otherwise, a 
generated ingestionSpec.
+   */
+  @VisibleForTesting

Review Comment:
   Try to write the tests in a way which doesn't require exposing internal 
methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to