kfaraz commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1583126810
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -93,9 +96,8 @@ public CoordinatorCompactionConfig(
this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
DEFAULT_MAX_COMPACTION_TASK_SLOTS :
maxCompactionTaskSlots;
- this.useAutoScaleSlots = useAutoScaleSlots == null ?
- DEFAULT_USE_AUTO_SCALE_SLOTS :
- useAutoScaleSlots;
+ this.useAutoScaleSlots = useAutoScaleSlots == null ?
DEFAULT_USE_AUTO_SCALE_SLOTS : useAutoScaleSlots;
+ this.compactionEngine = DEFAULT_COMPACTION_ENGINE;
Review Comment:
Shouldn't there be an argument corresponding to this field in the
constructor?
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -93,9 +96,8 @@ public CoordinatorCompactionConfig(
this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
DEFAULT_MAX_COMPACTION_TASK_SLOTS :
maxCompactionTaskSlots;
- this.useAutoScaleSlots = useAutoScaleSlots == null ?
- DEFAULT_USE_AUTO_SCALE_SLOTS :
- useAutoScaleSlots;
+ this.useAutoScaleSlots = useAutoScaleSlots == null ?
DEFAULT_USE_AUTO_SCALE_SLOTS : useAutoScaleSlots;
Review Comment:
```suggestion
this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots,
DEFAULT_USE_AUTO_SCALE_SLOTS);
```
##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+ private final CompactionEngine type;
+
+ @JsonCreator
+ public UserCompactionStrategy(@JsonProperty("type") CompactionEngine type)
+ {
+ this.type = type;
+ }
+
+ @JsonProperty
+ public CompactionEngine getType()
+ {
+ return type;
+ }
+
+ @Override
+ public String toString()
Review Comment:
Is this needed for logging anywhere?
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java:
##########
@@ -135,13 +143,16 @@ public boolean equals(Object o)
return Double.compare(that.compactionTaskSlotRatio,
compactionTaskSlotRatio) == 0 &&
maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
useAutoScaleSlots == that.useAutoScaleSlots &&
+ compactionEngine == that.compactionEngine &&
Objects.equals(compactionConfigs, that.compactionConfigs);
}
@Override
public int hashCode()
{
- return Objects.hash(compactionConfigs, compactionTaskSlotRatio,
maxCompactionTaskSlots, useAutoScaleSlots);
+ return Objects.hash(compactionConfigs, compactionTaskSlotRatio,
maxCompactionTaskSlots, useAutoScaleSlots,
Review Comment:
Please fix formatting here by putting each argument on a separate line.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
-
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
- final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
+ final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas =
createDataSchemasForIntervals(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
- ioConfig,
segmentProvider,
- partitionConfigurationManager,
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
- toolbox.getCoordinatorClient(),
- segmentCacheManagerFactory,
getMetricBuilder()
);
- final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
- .range(0, ingestionSpecs.size())
- .mapToObj(i -> {
- // The ID of SubtaskSpecs is used as the base sequenceName in
segment allocation protocol.
- // The indexing tasks generated by the compaction task should use
different sequenceNames
- // so that they can allocate valid segment IDs with no duplication.
- ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
- final String baseSequenceName = createIndexTaskSpecId(i);
- return newTask(baseSequenceName, ingestionSpec);
- })
- .collect(Collectors.toList());
-
- if (indexTaskSpecs.isEmpty()) {
- String msg = StringUtils.format(
- "Can't find segments from inputSpec[%s], nothing to do.",
- ioConfig.getInputSpec()
- );
- log.warn(msg);
- return TaskStatus.failure(getId(), msg);
- } else {
- registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
- final int totalNumSpecs = indexTaskSpecs.size();
- log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
- int failCnt = 0;
- final TaskReport.ReportMap completionReports = new
TaskReport.ReportMap();
- for (int i = 0; i < indexTaskSpecs.size(); i++) {
- ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
- final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
- if (!currentSubTaskHolder.setTask(eachSpec)) {
- String errMsg = "Task was asked to stop. Finish as failed.";
- log.info(errMsg);
- return TaskStatus.failure(getId(), errMsg);
- }
- try {
- if (eachSpec.isReady(toolbox.getTaskActionClient())) {
- log.info("Running indexSpec: " + json);
- final TaskStatus eachResult = eachSpec.run(toolbox);
- if (!eachResult.isSuccess()) {
- failCnt++;
- log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
-
- String reportKeySuffix = "_" + i;
- Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
- reports -> completionReports.putAll(
- CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)
- )
- );
- } else {
- failCnt++;
- log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
- }
- }
- catch (Exception e) {
- failCnt++;
- log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
- }
-
- String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d]
failed",
- totalNumSpecs, totalNumSpecs - failCnt,
failCnt
- );
- toolbox.getTaskReportFileWriter().write(getId(), completionReports);
- log.info(msg);
- return failCnt == 0 ? TaskStatus.success(getId()) :
TaskStatus.failure(getId(), msg);
+ if (compactionStrategy == null) {
+ // Can only happen for MSQ engine, when the json subtype reqd for
deserialization isn't available due to
+ // missing extn.
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build(
+ "Extension[druid-multi-stage-query] required for
running compaction on MSQ "
+ + "not found on the Indexer.");
}
- }
-
- @VisibleForTesting
- ParallelIndexSupervisorTask newTask(String baseSequenceName,
ParallelIndexIngestionSpec ingestionSpec)
- {
- return new ParallelIndexSupervisorTask(
- getId(),
- getGroupId(),
- getTaskResource(),
- ingestionSpec,
- baseSequenceName,
- createContextForSubtask(),
- true
- );
- }
-
- @VisibleForTesting
- Map<String, Object> createContextForSubtask()
- {
- final Map<String, Object> newContext = new HashMap<>(getContext());
- newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
- newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY,
STORE_COMPACTION_STATE);
- // Set the priority of the compaction task.
- newContext.put(Tasks.PRIORITY_KEY, getPriority());
- return newContext;
- }
-
- private String createIndexTaskSpecId(int i)
- {
- return StringUtils.format("%s_%d", getId(), i);
+ registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
Review Comment:
```suggestion
registerResourceCloserOnAbnormalExit(compactionStrategy.getCurrentSubTaskHolder());
```
##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+ private final CompactionEngine TYPE;
+
+ @JsonCreator
+ public UserCompactionStrategy(@JsonProperty("TYPE") CompactionEngine TYPE)
Review Comment:
```suggestion
public UserCompactionStrategy(@JsonProperty("type") CompactionEngine type)
```
##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
Review Comment:
Yes, I agree.
This should have the prefix `Client` rather than `User`.
So, a meaningful name could be `ClientCompactionRunnerInfo` since it is
related to the runner but is not an actual runner itself.
Please add a javadoc clarifying that this class is only needed for serde and
would be deserialized on the Overlord side as a `CompactionRunner`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY,
+ property = CompactionStrategy.TYPE_PROPERTY,
+ visible = true)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value =
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy
Review Comment:
This name is confusing as it is difficult to distinguish from
`CompactionPolicy`. This class is more of a `CompactionRunner`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY,
+ property = CompactionStrategy.TYPE_PROPERTY,
+ visible = true)
Review Comment:
A simpler version of this should work too:
```suggestion
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
```
For verification, see the usage in `Task` interface. `Task` also has a
`getType()` method which doesn't seem to interfere with the
serialization/deserialization.
##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+ private final CompactionEngine TYPE;
Review Comment:
```suggestion
private final CompactionEngine type;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY,
+ property = CompactionStrategy.TYPE_PROPERTY,
+ visible = true)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value =
NativeCompactionStrategy.class)
Review Comment:
Probably best to keep the type name constants in a single place, i.e.
`CompactionEngine`.
##########
server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java:
##########
@@ -192,6 +207,7 @@ public boolean equals(Object o)
Arrays.equals(metricsSpec, that.metricsSpec) &&
Objects.equals(transformSpec, that.transformSpec) &&
Objects.equals(ioConfig, that.ioConfig) &&
+ Objects.equals(compactionEngine, that.compactionEngine) &&
Review Comment:
this should work too as this is a plain enum field
```suggestion
this.compactionEngine == that.compactionEngine &&
```
##########
processing/src/main/java/org/apache/druid/indexer/CompactionEngine.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Encapsulates the Engine to be used for a compaction task.
+ * Should be synchronized with the subtypes for {@link
org.apache.druid.indexing.common.task.CompactionStrategy}.
+ */
+public enum CompactionEngine
+{
+ NATIVE,
+ MSQ;
+
+ @JsonCreator
+ public static CompactionEngine fromString(String name)
+ {
+ if (name == null) {
+ return null;
+ }
+ return valueOf(StringUtils.toUpperCase(name));
Review Comment:
```suggestion
return name == null ? null : valueOf(StringUtils.toUpperCase(name));
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
-
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
- final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
+ final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas =
createDataSchemasForIntervals(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
- ioConfig,
segmentProvider,
- partitionConfigurationManager,
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
- toolbox.getCoordinatorClient(),
- segmentCacheManagerFactory,
getMetricBuilder()
);
- final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
- .range(0, ingestionSpecs.size())
- .mapToObj(i -> {
- // The ID of SubtaskSpecs is used as the base sequenceName in
segment allocation protocol.
- // The indexing tasks generated by the compaction task should use
different sequenceNames
- // so that they can allocate valid segment IDs with no duplication.
- ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
- final String baseSequenceName = createIndexTaskSpecId(i);
- return newTask(baseSequenceName, ingestionSpec);
- })
- .collect(Collectors.toList());
-
- if (indexTaskSpecs.isEmpty()) {
- String msg = StringUtils.format(
- "Can't find segments from inputSpec[%s], nothing to do.",
- ioConfig.getInputSpec()
- );
- log.warn(msg);
- return TaskStatus.failure(getId(), msg);
- } else {
- registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
- final int totalNumSpecs = indexTaskSpecs.size();
- log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
- int failCnt = 0;
- final TaskReport.ReportMap completionReports = new
TaskReport.ReportMap();
- for (int i = 0; i < indexTaskSpecs.size(); i++) {
- ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
- final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
- if (!currentSubTaskHolder.setTask(eachSpec)) {
- String errMsg = "Task was asked to stop. Finish as failed.";
- log.info(errMsg);
- return TaskStatus.failure(getId(), errMsg);
- }
- try {
- if (eachSpec.isReady(toolbox.getTaskActionClient())) {
- log.info("Running indexSpec: " + json);
- final TaskStatus eachResult = eachSpec.run(toolbox);
- if (!eachResult.isSuccess()) {
- failCnt++;
- log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
-
- String reportKeySuffix = "_" + i;
- Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
- reports -> completionReports.putAll(
- CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)
- )
- );
- } else {
- failCnt++;
- log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
- }
- }
- catch (Exception e) {
- failCnt++;
- log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
- }
-
- String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d]
failed",
- totalNumSpecs, totalNumSpecs - failCnt,
failCnt
- );
- toolbox.getTaskReportFileWriter().write(getId(), completionReports);
- log.info(msg);
- return failCnt == 0 ? TaskStatus.success(getId()) :
TaskStatus.failure(getId(), msg);
+ if (compactionStrategy == null) {
+ // Can only happen for MSQ engine, when the json subtype reqd for
deserialization isn't available due to
Review Comment:
I don't think this is true. Is MSQ extension is not present, then a
compaction task with engine as "msq" will throw a JsonException while being
submitted to the Overlord itself.
##########
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import java.util.Objects;
+
+
+public class UserCompactionStrategy
+{
+ private final CompactionEngine TYPE;
+
+ @JsonCreator
+ public UserCompactionStrategy(@JsonProperty("TYPE") CompactionEngine TYPE)
+ {
+ this.TYPE = TYPE;
+ }
+
+ @JsonProperty("TYPE")
Review Comment:
```suggestion
@JsonProperty("type")
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -164,21 +157,17 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final CompactionTuningConfig tuningConfig;
+ @Nullable
+ private final CompactionStrategy compactionStrategy;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
private final PartitionConfigurationManager partitionConfigurationManager;
-
@JsonIgnore
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
@JsonIgnore
- private final CurrentSubTaskHolder currentSubTaskHolder = new
CurrentSubTaskHolder(
- (taskObject, config) -> {
- final ParallelIndexSupervisorTask indexTask =
(ParallelIndexSupervisorTask) taskObject;
- indexTask.stopGracefully(config);
- }
- );
+ private final CurrentSubTaskHolder currentSubTaskHolder;
Review Comment:
This probably doesn't need to be a field anymore.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
-
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
- final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
+ final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas =
createDataSchemasForIntervals(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
- ioConfig,
segmentProvider,
- partitionConfigurationManager,
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
- toolbox.getCoordinatorClient(),
- segmentCacheManagerFactory,
getMetricBuilder()
);
- final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
- .range(0, ingestionSpecs.size())
- .mapToObj(i -> {
- // The ID of SubtaskSpecs is used as the base sequenceName in
segment allocation protocol.
- // The indexing tasks generated by the compaction task should use
different sequenceNames
- // so that they can allocate valid segment IDs with no duplication.
- ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
- final String baseSequenceName = createIndexTaskSpecId(i);
- return newTask(baseSequenceName, ingestionSpec);
- })
- .collect(Collectors.toList());
-
- if (indexTaskSpecs.isEmpty()) {
- String msg = StringUtils.format(
- "Can't find segments from inputSpec[%s], nothing to do.",
- ioConfig.getInputSpec()
- );
- log.warn(msg);
- return TaskStatus.failure(getId(), msg);
- } else {
- registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
- final int totalNumSpecs = indexTaskSpecs.size();
- log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
- int failCnt = 0;
- final TaskReport.ReportMap completionReports = new
TaskReport.ReportMap();
- for (int i = 0; i < indexTaskSpecs.size(); i++) {
- ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
- final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
- if (!currentSubTaskHolder.setTask(eachSpec)) {
- String errMsg = "Task was asked to stop. Finish as failed.";
- log.info(errMsg);
- return TaskStatus.failure(getId(), errMsg);
- }
- try {
- if (eachSpec.isReady(toolbox.getTaskActionClient())) {
- log.info("Running indexSpec: " + json);
- final TaskStatus eachResult = eachSpec.run(toolbox);
- if (!eachResult.isSuccess()) {
- failCnt++;
- log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
-
- String reportKeySuffix = "_" + i;
- Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
- reports -> completionReports.putAll(
- CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)
- )
- );
- } else {
- failCnt++;
- log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
- }
- }
- catch (Exception e) {
- failCnt++;
- log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
- }
-
- String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d]
failed",
- totalNumSpecs, totalNumSpecs - failCnt,
failCnt
- );
- toolbox.getTaskReportFileWriter().write(getId(), completionReports);
- log.info(msg);
- return failCnt == 0 ? TaskStatus.success(getId()) :
TaskStatus.failure(getId(), msg);
+ if (compactionStrategy == null) {
+ // Can only happen for MSQ engine, when the json subtype reqd for
deserialization isn't available due to
+ // missing extn.
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
Review Comment:
Can be an `InvalidInput.exception` instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -357,6 +349,16 @@ public DimensionsSpec getDimensionsSpec()
return dimensionsSpec;
}
+ public PartitionConfigurationManager getPartitionConfigurationManager()
+ {
+ return partitionConfigurationManager;
+ }
+
+ public SegmentCacheManagerFactory getSegmentCacheManagerFactory()
+ {
+ return segmentCacheManagerFactory;
+ }
Review Comment:
These are not needed here anymore. They should only be present in the
`NativeCompactionStrategy`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY,
+ property = CompactionStrategy.TYPE_PROPERTY,
+ visible = true)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value =
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy
+{
+ String TYPE_PROPERTY = "TYPE";
+
+ TaskStatus runCompactionTasks(
+ CompactionTask compactionTask,
+ TaskToolbox taskToolbox,
+ List<NonnullPair<Interval, DataSchema>> dataSchemas
+ ) throws JsonProcessingException;
Review Comment:
```suggestion
) throws Exception;
```
Since this is a task run method, it can throw all exceptions that
`Task.run()` can.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionStrategy.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.utils.CollectionUtils;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NativeCompactionStrategy implements CompactionStrategy
+{
+ private static final Logger log = new Logger(NativeCompactionStrategy.class);
+ public static final String TYPE = "NATIVE";
+ private static final boolean STORE_COMPACTION_STATE = true;
+
+ @JsonCreator
+ public NativeCompactionStrategy()
+ {
+ }
+
+ @Override
+ public CurrentSubTaskHolder getCurrentSubTaskHolder()
+ {
+ return new CurrentSubTaskHolder(
+ (taskObject, config) -> {
+ final ParallelIndexSupervisorTask indexTask =
(ParallelIndexSupervisorTask) taskObject;
+ indexTask.stopGracefully(config);
+ }
+ );
+ }
+
+ @Override
+ @JsonProperty("TYPE")
Review Comment:
`@JsonProperty` is probably not be needed. See the other comments and the
examples of `Task` implementations.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * Strategy to be used for executing a compaction task.
+ * Should be synchronized with {@link
org.apache.druid.server.coordinator.UserCompactionStrategy}
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY,
+ property = CompactionStrategy.TYPE_PROPERTY,
+ visible = true)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = NativeCompactionStrategy.TYPE, value =
NativeCompactionStrategy.class)
+})
+public interface CompactionStrategy
+{
+ String TYPE_PROPERTY = "TYPE";
+
+ TaskStatus runCompactionTasks(
+ CompactionTask compactionTask,
+ TaskToolbox taskToolbox,
+ List<NonnullPair<Interval, DataSchema>> dataSchemas
+ ) throws JsonProcessingException;
+
+ CurrentSubTaskHolder getCurrentSubTaskHolder();
Review Comment:
Yes, that's a good point. But I think the validation should also happen on
the coordinator side when submitting a new compaction config.
I guess one way to do that would be to have `ClientCompactionRunnerInfo`
expose a `supportsCompactionConfig(DatasourceCompactionConfig config)` method.
@gargvishesh , are we doing any compatibility validation in the current set of
changes?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionStrategy.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.utils.CollectionUtils;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NativeCompactionStrategy implements CompactionStrategy
+{
+ private static final Logger log = new Logger(NativeCompactionStrategy.class);
+ public static final String TYPE = "NATIVE";
+ private static final boolean STORE_COMPACTION_STATE = true;
+
+ @JsonCreator
+ public NativeCompactionStrategy()
+ {
+ }
+
+ @Override
+ public CurrentSubTaskHolder getCurrentSubTaskHolder()
+ {
+ return new CurrentSubTaskHolder(
+ (taskObject, config) -> {
+ final ParallelIndexSupervisorTask indexTask =
(ParallelIndexSupervisorTask) taskObject;
+ indexTask.stopGracefully(config);
+ }
+ );
+ }
+
+ @Override
+ @JsonProperty("TYPE")
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ /**
+ * Generate {@link ParallelIndexIngestionSpec} from input dataschemas.
+ *
+ * @return an empty list if input segments don't exist. Otherwise, a
generated ingestionSpec.
+ */
+ @VisibleForTesting
Review Comment:
Try to write the tests in a way which doesn't require exposing internal
methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]