kfaraz commented on code in PR #16291: URL: https://github.com/apache/druid/pull/16291#discussion_r1630655234
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) Review Comment: Annotate this with `@JsonCreator`. It would also be nice to put the args on separate lines. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore Review Comment: Why do we need `@JsonIgnore` on this? IIUC, `MSQCompactionRunner` is never serialized, only deserialized. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java: ########## @@ -190,6 +192,8 @@ public List<? extends Module> getJacksonModules() NilInputSource.class ); + module.registerSubtypes(new NamedType(MSQCompactionRunner.class, "msq")); Review Comment: The runner type should be a constant in `MSQCompactionRunner`. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( + CompactionTask compactionTask + ) + { + if (compactionTask.getTuningConfig() != null) { Review Comment: Please add a brief comment for each of the code blocks mentioning what is being validated. Example: ```java // Verify that partitioning type is either 'dynamic' or 'range' ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo; +import org.joda.time.Interval; + +import java.util.List; + +/** + * Strategy to be used for executing a compaction task. + * Should be synchronized with {@link ClientCompactionRunnerInfo} + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = CompactionRunner.TYPE_PROPERTY) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class) +}) +public interface CompactionRunner +{ + String TYPE_PROPERTY = "type"; + + TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> dataSchemas, + TaskToolbox taskToolbox + ) throws Exception; + + CurrentSubTaskHolder getCurrentSubTaskHolder(); + + /** + * Checks if the provided compaction config is supported by the runner. + * + * @param compactionTask + * @return Pair of (supported) boolean and a reason string. Reason string is empty if supported is true. + */ + NonnullPair<Boolean, String> supportsCompactionSpec(CompactionTask compactionTask); Review Comment: - Rename this method to `validateCompactionTask()` since the arguments have changed since the original set of changes in the PR. - Instead of a pair, return a `ValidationResult`, same as the one recommended for `ClientCompactionRunnerInfo`. - The javadoc should also link to `ClientCompactionRunnerInfo.validateCompactionConfig()` mentioning that the set of validations performed is the same. ########## processing/src/main/java/org/apache/druid/indexer/CompactionEngine.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; + +/** + * Encapsulates the Engine to be used for a compaction task. + * Should be kept in sync with the subtypes for {@link org.apache.druid.indexing.common.task.CompactionRunner}. + */ +public enum CompactionEngine +{ + // Using uppercase constants since "native" is a reserved keyword. Review Comment: unnecessary comment since enums are recommended to be upper-case anyway. ########## server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryContext; + +import java.util.Objects; + + +/** + * This class is just used to pass the strategy type via the "type" parameter for deserilization to appropriate + * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at the overlod. + */ +public class ClientCompactionRunnerInfo +{ + private final CompactionEngine type; + + @JsonCreator + public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine type) + { + this.type = type; + } + + @JsonProperty + public CompactionEngine getType() + { + return type; + } + + @Override + public String toString() + { + return "ClientCompactionRunnerInfo{" + + "type=" + type + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o; + return type == that.type; + } + + @Override + public int hashCode() + { + return Objects.hash(type); + } + + /** + * Checks if the provided compaction config is supported by the runner + * @param newConfig The updated compaction config + * @param engineSource String indicating the source of compaction engine. + * @return Pair of support boolean and reason string. The reason string is empty if support boolean is True. + */ + public static NonnullPair<Boolean, String> supportsCompactionConfig(DataSourceCompactionConfig newConfig, String engineSource) + { + CompactionEngine compactionEngine = newConfig.getEngine(); + if (compactionEngine == CompactionEngine.MSQ) { Review Comment: It would be easier to read this method if the content specific to MSQ was moved to a separate method. ```java if (compactionEngine == CompactionEngine.NATIVE) { return true; } else { return msqEngineSupportsCompactionConfig(); } ``` The new method should also list out the cases which are not supported in the javadoc. ########## processing/src/main/java/org/apache/druid/timeline/CompactionState.java: ########## @@ -58,15 +61,19 @@ public class CompactionState // org.apache.druid.query.aggregation.AggregatorFactory cannot be used here because it's in the 'processing' module which // has a dependency on the 'core' module where this class is. private final List<Object> metricsSpec; + private final CompactionEngine engine; + private final Map<String, AggregatorFactory> dimensionToAggregatoryFactoryMap; @JsonCreator public CompactionState( @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, + @JsonProperty("dimensionToAggregatoryFactoryMap") Map<String, AggregatorFactory> dimensionToAggregatoryFactoryMap, Review Comment: Please annotate the new fields in the constructor and their getters as `@Nullable`. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( + CompactionTask compactionTask + ) + { + if (compactionTask.getTuningConfig() != null) { + PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec(); Review Comment: It should be possible to re-use the code in `ClientCompactionRunnerInfo` since the MSQ extension depends on the `server` module. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( + CompactionTask compactionTask + ) + { + if (compactionTask.getTuningConfig() != null) { + PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec(); + if (!(partitionsSpec instanceof DynamicPartitionsSpec + || partitionsSpec instanceof DimensionRangePartitionsSpec)) { + return new NonnullPair<>(false, StringUtils.format( + "Invalid partition spec type[%s] for MSQ compaction engine." + + " Type must be either DynamicPartitionsSpec or DynamicRangePartitionsSpec.", + partitionsSpec.getClass() + )); + } + if (partitionsSpec instanceof DynamicPartitionsSpec + && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) { + return new NonnullPair<>(false, StringUtils.format( + "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ compaction engine.", + ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() + )); + } + } + if (compactionTask.getMetricsSpec() != null + && compactionTask.getGranularitySpec() != null + && !compactionTask.getGranularitySpec() + .isRollup()) { + return new NonnullPair<>( + false, + "rollup in granularitySpec must be set to true if metricsSpec is specifed " + + "for MSQ compaction engine." + ); + } + + QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext()); + if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) { + return new NonnullPair<>(false, StringUtils.format( + "Config[%s] cannot be set to false for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS + )); + } + if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == WorkerAssignmentStrategy.AUTO) { + return new NonnullPair<>( + false, + StringUtils.format( + "Config[%s] cannot be set to value[%s] for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, + WorkerAssignmentStrategy.AUTO + ) + ); + } + return new NonnullPair<>(true, ""); + } + + @Override + public CurrentSubTaskHolder getCurrentSubTaskHolder() + { + return currentSubTaskHolder; + } + + @Override + public TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> intervalDataSchemas, + TaskToolbox taskToolbox + ) throws Exception + { + List<MSQControllerTask> msqControllerTasks = compactionToMSQTasks(compactionTask, intervalDataSchemas); + + if (msqControllerTasks.isEmpty()) { + String msg = StringUtils.format( + "Can't find segments from inputSpec[%s], nothing to do.", + compactionTask.getIoConfig().getInputSpec() + ); + return TaskStatus.failure(compactionTask.getId(), msg); + } + return runSubtasks( + msqControllerTasks, + taskToolbox, + currentSubTaskHolder, + compactionTask.getId() + ); + } + + public List<MSQControllerTask> compactionToMSQTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> intervalDataSchemas + ) throws JsonProcessingException + { + List<MSQControllerTask> msqControllerTasks = new ArrayList<>(); Review Comment: Nit: ```suggestion final List<MSQControllerTask> msqControllerTasks = new ArrayList<>(); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo; +import org.joda.time.Interval; + +import java.util.List; + +/** + * Strategy to be used for executing a compaction task. + * Should be synchronized with {@link ClientCompactionRunnerInfo} + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = CompactionRunner.TYPE_PROPERTY) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class) +}) +public interface CompactionRunner +{ + String TYPE_PROPERTY = "type"; + + TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> dataSchemas, + TaskToolbox taskToolbox + ) throws Exception; + + CurrentSubTaskHolder getCurrentSubTaskHolder(); + + /** + * Checks if the provided compaction config is supported by the runner. + * + * @param compactionTask Review Comment: Not needed. ```suggestion ``` ########## server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java: ########## @@ -119,7 +119,8 @@ public Response addOrUpdateCompactionConfig( .getCompactionConfigs() .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - newConfigs.put(newConfig.getDataSource(), newConfig); + DataSourceCompactionConfig updatedConfig = DataSourceCompactionConfig.from(newConfig, current.getEngine()); + newConfigs.put(updatedConfig.getDataSource(), updatedConfig); Review Comment: Use the `newConfig` instead of the `updatedConfig` to persist. If the `newConfig` (i.e. the API payload posted by the user) has no engine set, we would have set the engine to the default value in `updatedConfig`. When we want to switch the engine at the cluster level, we would have to update each config individually. ########## docs/multi-stage-query/known-issues.md: ########## @@ -68,3 +68,8 @@ properties, and the `indexSpec` [`tuningConfig`](../ingestion/ingestion-spec.md# - The maximum number of elements in a window cannot exceed a value of 100,000. - To avoid `leafOperators` in MSQ engine, window functions have an extra scan stage after the window stage for cases where native engine has a non-empty `leafOperator`. + +## Compaction + +- Compaction configured with MSQ engine currently doesn't support context flags `finalizeAggregations=false` and + `taskAssignment=auto`. Review Comment: This should not be here as this is only an experimental feature right now. There should be a heading in the compaction page giving a summary of Auto-compaction using MSQ, how it can be enabled/disabled and what are its current shortcomings. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo; +import org.joda.time.Interval; + +import java.util.List; + +/** + * Strategy to be used for executing a compaction task. + * Should be synchronized with {@link ClientCompactionRunnerInfo} + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = CompactionRunner.TYPE_PROPERTY) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class) +}) +public interface CompactionRunner +{ + String TYPE_PROPERTY = "type"; + + TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> dataSchemas, Review Comment: Why a list, why not just a Map from Interval to DataSchema? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.coordinator.ClientCompactionRunnerInfo; +import org.joda.time.Interval; + +import java.util.List; + +/** + * Strategy to be used for executing a compaction task. + * Should be synchronized with {@link ClientCompactionRunnerInfo} + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = CompactionRunner.TYPE_PROPERTY) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "native", value = NativeCompactionRunner.class) +}) +public interface CompactionRunner +{ + String TYPE_PROPERTY = "type"; + + TaskStatus runCompactionTasks( Review Comment: A small javadoc here would be nice. ########## server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryContext; + +import java.util.Objects; + + +/** + * This class is just used to pass the strategy type via the "type" parameter for deserilization to appropriate + * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at the overlod. + */ +public class ClientCompactionRunnerInfo +{ + private final CompactionEngine type; + + @JsonCreator + public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine type) + { + this.type = type; + } + + @JsonProperty + public CompactionEngine getType() + { + return type; + } + + @Override + public String toString() + { + return "ClientCompactionRunnerInfo{" + + "type=" + type + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o; + return type == that.type; + } + + @Override + public int hashCode() + { + return Objects.hash(type); + } + + /** + * Checks if the provided compaction config is supported by the runner + * @param newConfig The updated compaction config + * @param engineSource String indicating the source of compaction engine. + * @return Pair of support boolean and reason string. The reason string is empty if support boolean is True. + */ + public static NonnullPair<Boolean, String> supportsCompactionConfig(DataSourceCompactionConfig newConfig, String engineSource) Review Comment: 1. Please put arguments on separate lines. 2. Rename the method to `validateCompactionConfig` 3. Return a `ValidationResult` that has a `boolean isValid` and a `List<String> errorMessages`. A pair is often difficult to interpret for the caller and makes sense only when there is a clearly defined LHS and RHS, e.g. in an equation or expression evaluation. 4. The `engineSource` should not be passed here. It can either be left out altogether or be used in the exception constructed in `DataSourceCompactionConfig.from()`. ########## server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryContext; + +import java.util.Objects; + + +/** + * This class is just used to pass the strategy type via the "type" parameter for deserilization to appropriate + * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at the overlod. + */ +public class ClientCompactionRunnerInfo +{ + private final CompactionEngine type; + + @JsonCreator + public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine type) + { + this.type = type; + } + + @JsonProperty + public CompactionEngine getType() + { + return type; + } + + @Override + public String toString() + { + return "ClientCompactionRunnerInfo{" + + "type=" + type + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o; + return type == that.type; + } + + @Override + public int hashCode() + { + return Objects.hash(type); + } + + /** + * Checks if the provided compaction config is supported by the runner + * @param newConfig The updated compaction config + * @param engineSource String indicating the source of compaction engine. + * @return Pair of support boolean and reason string. The reason string is empty if support boolean is True. + */ + public static NonnullPair<Boolean, String> supportsCompactionConfig(DataSourceCompactionConfig newConfig, String engineSource) + { + CompactionEngine compactionEngine = newConfig.getEngine(); + if (compactionEngine == CompactionEngine.MSQ) { + if (newConfig.getTuningConfig() != null) { Review Comment: Please add a one-line comment before each type of validation. For example, ```java // Partitioning must either by 'dynamic' or 'range' ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( Review Comment: Please add a javadoc to this method listing out the cases which are currently not supported. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( + CompactionTask compactionTask + ) + { + if (compactionTask.getTuningConfig() != null) { + PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec(); + if (!(partitionsSpec instanceof DynamicPartitionsSpec + || partitionsSpec instanceof DimensionRangePartitionsSpec)) { + return new NonnullPair<>(false, StringUtils.format( + "Invalid partition spec type[%s] for MSQ compaction engine." + + " Type must be either DynamicPartitionsSpec or DynamicRangePartitionsSpec.", + partitionsSpec.getClass() + )); + } + if (partitionsSpec instanceof DynamicPartitionsSpec + && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) { + return new NonnullPair<>(false, StringUtils.format( + "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ compaction engine.", + ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() + )); + } + } + if (compactionTask.getMetricsSpec() != null + && compactionTask.getGranularitySpec() != null + && !compactionTask.getGranularitySpec() + .isRollup()) { + return new NonnullPair<>( + false, + "rollup in granularitySpec must be set to true if metricsSpec is specifed " + + "for MSQ compaction engine." + ); + } + + QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext()); + if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) { + return new NonnullPair<>(false, StringUtils.format( + "Config[%s] cannot be set to false for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS + )); + } + if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == WorkerAssignmentStrategy.AUTO) { + return new NonnullPair<>( + false, + StringUtils.format( + "Config[%s] cannot be set to value[%s] for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, + WorkerAssignmentStrategy.AUTO + ) + ); + } + return new NonnullPair<>(true, ""); + } + + @Override + public CurrentSubTaskHolder getCurrentSubTaskHolder() + { + return currentSubTaskHolder; + } + + @Override + public TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> intervalDataSchemas, + TaskToolbox taskToolbox + ) throws Exception + { + List<MSQControllerTask> msqControllerTasks = compactionToMSQTasks(compactionTask, intervalDataSchemas); + + if (msqControllerTasks.isEmpty()) { + String msg = StringUtils.format( + "Can't find segments from inputSpec[%s], nothing to do.", + compactionTask.getIoConfig().getInputSpec() + ); + return TaskStatus.failure(compactionTask.getId(), msg); + } + return runSubtasks( + msqControllerTasks, + taskToolbox, + currentSubTaskHolder, + compactionTask.getId() + ); + } + + public List<MSQControllerTask> compactionToMSQTasks( Review Comment: ```suggestion public List<MSQControllerTask> createMsqControllerTasks( ``` ########## server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java: ########## @@ -307,9 +327,16 @@ private CompactionStatus metricsSpecIsUpToDate() } final List<Object> metricSpecList = lastCompactionState.getMetricsSpec(); - final AggregatorFactory[] existingMetricsSpec - = CollectionUtils.isNullOrEmpty(metricSpecList) - ? null : objectMapper.convertValue(metricSpecList, AggregatorFactory[].class); + final AggregatorFactory[] existingMetricsSpec; + if (CollectionUtils.isNullOrEmpty(metricSpecList)) { Review Comment: please add a comment here explaining what is being done. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java: ########## @@ -119,7 +119,8 @@ public Response addOrUpdateCompactionConfig( .getCompactionConfigs() .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - newConfigs.put(newConfig.getDataSource(), newConfig); + DataSourceCompactionConfig updatedConfig = DataSourceCompactionConfig.from(newConfig, current.getEngine()); Review Comment: I don't think we need this new method `DataSourceCompactionConfig.from()`. Simply call `ClientCompactionRunnerInfo.validateCompactionConfig()` from here and use the validation result to check if an exception needs to be thrown. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.CompactionRunner; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TimestampFloorExprMacro; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MSQCompactionRunner implements CompactionRunner +{ + private static final Logger log = new Logger(MSQCompactionRunner.class); + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; + + private final ObjectMapper jsonMapper; + private final Injector injector; + + public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String TIME_COLUMN = ColumnHolder.TIME_COLUMN_NAME; + + @JsonIgnore + private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( + (taskObject, config) -> { + final MSQControllerTask msqControllerTask = (MSQControllerTask) taskObject; + msqControllerTask.stopGracefully(config); + }); + + + public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + { + this.jsonMapper = jsonMapper; + this.injector = injector; + } + + @Override + public NonnullPair<Boolean, String> supportsCompactionSpec( + CompactionTask compactionTask + ) + { + if (compactionTask.getTuningConfig() != null) { + PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec(); + if (!(partitionsSpec instanceof DynamicPartitionsSpec + || partitionsSpec instanceof DimensionRangePartitionsSpec)) { + return new NonnullPair<>(false, StringUtils.format( + "Invalid partition spec type[%s] for MSQ compaction engine." + + " Type must be either DynamicPartitionsSpec or DynamicRangePartitionsSpec.", + partitionsSpec.getClass() + )); + } + if (partitionsSpec instanceof DynamicPartitionsSpec + && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) { + return new NonnullPair<>(false, StringUtils.format( + "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ compaction engine.", + ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() + )); + } + } + if (compactionTask.getMetricsSpec() != null + && compactionTask.getGranularitySpec() != null + && !compactionTask.getGranularitySpec() + .isRollup()) { + return new NonnullPair<>( + false, + "rollup in granularitySpec must be set to true if metricsSpec is specifed " + + "for MSQ compaction engine." + ); + } + + QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext()); + if (!MultiStageQueryContext.isFinalizeAggregations(compactionTaskContext)) { + return new NonnullPair<>(false, StringUtils.format( + "Config[%s] cannot be set to false for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS + )); + } + if (MultiStageQueryContext.getAssignmentStrategy(compactionTaskContext) == WorkerAssignmentStrategy.AUTO) { + return new NonnullPair<>( + false, + StringUtils.format( + "Config[%s] cannot be set to value[%s] for auto-compaction with MSQ engine.", + MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, + WorkerAssignmentStrategy.AUTO + ) + ); + } + return new NonnullPair<>(true, ""); + } + + @Override + public CurrentSubTaskHolder getCurrentSubTaskHolder() + { + return currentSubTaskHolder; + } + + @Override + public TaskStatus runCompactionTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> intervalDataSchemas, + TaskToolbox taskToolbox + ) throws Exception + { + List<MSQControllerTask> msqControllerTasks = compactionToMSQTasks(compactionTask, intervalDataSchemas); + + if (msqControllerTasks.isEmpty()) { + String msg = StringUtils.format( + "Can't find segments from inputSpec[%s], nothing to do.", + compactionTask.getIoConfig().getInputSpec() + ); + return TaskStatus.failure(compactionTask.getId(), msg); + } + return runSubtasks( + msqControllerTasks, + taskToolbox, + currentSubTaskHolder, + compactionTask.getId() + ); + } + + public List<MSQControllerTask> compactionToMSQTasks( + CompactionTask compactionTask, + List<NonnullPair<Interval, DataSchema>> intervalDataSchemas + ) throws JsonProcessingException + { + List<MSQControllerTask> msqControllerTasks = new ArrayList<>(); + + for (NonnullPair<Interval, DataSchema> intervalDataSchema : intervalDataSchemas) { + Query<?> query; + Interval interval = intervalDataSchema.lhs; + DataSchema dataSchema = intervalDataSchema.rhs; + + if (!isGroupBy(dataSchema)) { + query = buildScanQuery(compactionTask, interval, dataSchema); + } else { + query = buildGroupByQuery(compactionTask, interval, dataSchema); + } Review Comment: Style: a positive condition reads better than a negative condition. ```suggestion if (isGroupBy(dataSchema)) { query = buildGroupByQuery(compactionTask, interval, dataSchema); } else { query = buildScanQuery(compactionTask, interval, dataSchema); } ``` ########## server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java: ########## @@ -34,11 +36,13 @@ public class CoordinatorCompactionConfig private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1; private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE; private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false; + private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = CompactionEngine.NATIVE; Review Comment: Cluster-level default should be a config and could be exposed as a separate API. That API should validate all existing datasource compaction configs. (this change maybe done in a follow up). ########## processing/src/main/java/org/apache/druid/timeline/CompactionState.java: ########## @@ -58,15 +61,19 @@ public class CompactionState // org.apache.druid.query.aggregation.AggregatorFactory cannot be used here because it's in the 'processing' module which // has a dependency on the 'core' module where this class is. private final List<Object> metricsSpec; + private final CompactionEngine engine; + private final Map<String, AggregatorFactory> dimensionToAggregatoryFactoryMap; Review Comment: This field requires a javadoc explaining its purpose and usage. ########## server/src/main/java/org/apache/druid/server/coordinator/ClientCompactionRunnerInfo.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryContext; + +import java.util.Objects; + + +/** + * This class is just used to pass the strategy type via the "type" parameter for deserilization to appropriate + * {@link org.apache.druid.indexing.common.task.CompactionRunner} subtype at the overlod. + */ +public class ClientCompactionRunnerInfo +{ + private final CompactionEngine type; + + @JsonCreator + public ClientCompactionRunnerInfo(@JsonProperty("type") CompactionEngine type) + { + this.type = type; + } + + @JsonProperty + public CompactionEngine getType() + { + return type; + } + + @Override + public String toString() + { + return "ClientCompactionRunnerInfo{" + + "type=" + type + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionRunnerInfo that = (ClientCompactionRunnerInfo) o; + return type == that.type; + } + + @Override + public int hashCode() + { + return Objects.hash(type); + } + + /** + * Checks if the provided compaction config is supported by the runner + * @param newConfig The updated compaction config + * @param engineSource String indicating the source of compaction engine. + * @return Pair of support boolean and reason string. The reason string is empty if support boolean is True. + */ + public static NonnullPair<Boolean, String> supportsCompactionConfig(DataSourceCompactionConfig newConfig, String engineSource) + { + CompactionEngine compactionEngine = newConfig.getEngine(); + if (compactionEngine == CompactionEngine.MSQ) { + if (newConfig.getTuningConfig() != null) { + PartitionsSpec partitionsSpec = newConfig.getTuningConfig().getPartitionsSpec(); + if (!(partitionsSpec instanceof DimensionRangePartitionsSpec + || partitionsSpec instanceof DynamicPartitionsSpec)) { + return new NonnullPair<>(false, StringUtils.format( + "Invalid partition spec type[%s] for MSQ compaction engine[%s]." + + " Type must be either DynamicPartitionsSpec or DynamicRangePartitionsSpec.", + partitionsSpec.getClass(), + engineSource + ) + ); + } + if (partitionsSpec instanceof DynamicPartitionsSpec + && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) { + return new NonnullPair<>(false, StringUtils.format( + "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ compaction engine[%s].", + ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows(), engineSource + )); + } + } + + if (newConfig.getMetricsSpec() != null + && newConfig.getGranularitySpec() != null + && !newConfig.getGranularitySpec() + .isRollup()) { + return new NonnullPair<>(false, StringUtils.format( + "rollup in granularitySpec must be set to True if metricsSpec is specifed " + + "for MSQ compaction engine[%s].", engineSource)); + } + + QueryContext queryContext = QueryContext.of(newConfig.getTaskContext()); + + if (!queryContext.getBoolean(MSQContext.CTX_FINALIZE_AGGREGATIONS, true)) { + return new NonnullPair<>(false, StringUtils.format( + "Config[%s] cannot be set to false for auto-compaction with MSQ engine[%s].", + MSQContext.CTX_FINALIZE_AGGREGATIONS, + engineSource + )); + } + + if (queryContext.getString(MSQContext.CTX_TASK_ASSIGNMENT_STRATEGY, MSQContext.TASK_ASSIGNMENT_STRATEGY_MAX) + .equals(MSQContext.TASK_ASSIGNMENT_STRATEGY_AUTO)) { + return new NonnullPair<>(false, StringUtils.format( + "Config[%s] cannot be set to value[%s] for auto-compaction with MSQ engine[%s].", + MSQContext.CTX_TASK_ASSIGNMENT_STRATEGY, + MSQContext.TASK_ASSIGNMENT_STRATEGY_AUTO, + engineSource + )); + } + } + return new NonnullPair<>(true, ""); + } + + /** + * This class copies over MSQ context parameters from the MSQ extension. This is required to validate the submitted + * compaction config at the coordinator. The values used here should be kept in sync with those in + * {@link org.apache.druid.msq.util.MultiStageQueryContext} Review Comment: This link is broker since `server` module does not have a dependency on `multi-stage-query`. Use `{@code}` tag instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
