kfaraz commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1689857162
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -610,7 +612,7 @@ private static DataSchema createDataSchema(
// Check index metadata & decide which values to propagate (i.e. carry
over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new
ExistingSegmentAnalyzer(
segments,
- granularitySpec.isRollup() == null,
+ true, // Always need rollup to check if there are some rollup segments
already present.
Review Comment:
Shouldn't it be always true for MSQ engine?
```suggestion
granularitySpec.isRollup() || engine == CompactionEngine.MSQ,
```
If it must always be true for native engine too, then let's just remove this
argument from the constructor as the constructor is not used anywhere else.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1624,33 +1629,49 @@ private static Function<Set<DataSegment>,
Set<DataSegment>> addCompactionStateTo
MSQSpec querySpec,
ObjectMapper jsonMapper,
DataSchema dataSchema,
- ShardSpec shardSpec,
+ @Nullable ShardSpec shardSpec,
+ @Nullable ClusterBy clusterBy,
String queryId
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
PartitionsSpec partitionSpec;
- if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
- List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ // shardSpec is absent in the absence of segments, which happens when only
tombstones are generated by an
+ // MSQControllerTask.
+ if (shardSpec != null) {
+ if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
+ List<String> partitionDimensions = ((DimensionRangeShardSpec)
shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
+ tuningConfig.getRowsPerSegment(),
+ null,
+ partitionDimensions,
+ false
+ );
+ } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED))
{
+ // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
+ partitionSpec = new
DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
+ } else {
+ // SingleDimenionShardSpec and other shard specs are never created in
MSQ.
+ throw new MSQException(
+ UnknownFault.forMessage(
+ StringUtils.format(
+ "Query[%s] cannot store compaction state in segments as
shard spec of unsupported type[%s].",
+ queryId,
+ shardSpec.getType()
+ )));
+ }
+ } else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
Review Comment:
Nit: Can `clusterBy.getColumns()` ever return `null`?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -139,13 +144,58 @@ public CompactionConfigValidationResult
validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
+ validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true,
null));
}
+ /**
+ * Valides that there are no rolled-up segments where either:
+ * <ul>
+ * <li>aggregator factory differs from its combining factory </li>
+ * <li>input col name is different from the output name (non-idempotent)</li>
+ * </ul>
+ */
+ private CompactionConfigValidationResult
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
+ {
+ for (Map.Entry<Interval, DataSchema> intervalDataSchema :
intervalToDataSchemaMap.entrySet()) {
+ if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
+ CombinedDataSchema combinedDataSchema = (CombinedDataSchema)
intervalDataSchema.getValue();
+ if (Boolean.TRUE.equals(combinedDataSchema.hasRolledUpSegments())) {
Review Comment:
If we make the change to use a primitive boolean in `CombinedDataSchema`, we
should also do this:
```suggestion
if (combinedDataSchema.hasRolledUpSegments()) {
```
##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema
+{
+ private final Boolean hasRolledUpSegments;
+
+ public CombinedDataSchema(
+ String dataSource,
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ AggregatorFactory[] aggregators,
+ GranularitySpec granularitySpec,
+ TransformSpec transformSpec,
+ @Nullable Map<String, Object> parserMap,
+ ObjectMapper objectMapper,
+ @Nullable Boolean hasRolledUpSegments
Review Comment:
I think this can be a primitive `boolean` as it is always non-null.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>,
Set<DataSegment>> addCompactionStateTo
: new
ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
- List<Object> metricsSpec = dataSchema.getAggregators() == null
- ? null
- : jsonMapper.convertValue(
- dataSchema.getAggregators(),
- new TypeReference<List<Object>>() {}
- );
+ List<Object> metricsSpec = Collections.emptyList();
+ if (querySpec.getQuery() instanceof GroupByQuery) {
+ // For group-by queries, the aggregators are transformed to their
combining factories in the dataschema, resulting
+ // in a mismatch between schema in compaction spec and the one in
compaction state. Sourcing the metricsSpec
+ // therefore directly from the querySpec.
+ GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
+ metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(),
new TypeReference<List<Object>>() {});
Review Comment:
Why convert? `groupByQuery.getAggregatorSpecs()` already returns a `List`.
##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema
Review Comment:
Please add a short javadoc here describing the purpose and usage of this
class.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1553,7 +1557,7 @@ private void handleQueryResults(
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
- if (!segments.isEmpty() && storeCompactionState) {
Review Comment:
Nit: Why don't we need to check for empty anymore?
##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema
+{
+ private final Boolean hasRolledUpSegments;
+
+ public CombinedDataSchema(
+ String dataSource,
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ AggregatorFactory[] aggregators,
+ GranularitySpec granularitySpec,
+ TransformSpec transformSpec,
+ @Nullable Map<String, Object> parserMap,
+ ObjectMapper objectMapper,
Review Comment:
These 2 args are always passed as null, please remove them from the
constructor.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -815,6 +821,11 @@ public Boolean getRollup()
return rollup;
}
+ public Boolean hasRolledUpSegments()
Review Comment:
Please return a primitive `boolean` here to avoid confusion of nullability.
```suggestion
public boolean hasRolledUpSegments()
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>,
Set<DataSegment>> addCompactionStateTo
: new
ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
- List<Object> metricsSpec = dataSchema.getAggregators() == null
- ? null
- : jsonMapper.convertValue(
- dataSchema.getAggregators(),
- new TypeReference<List<Object>>() {}
- );
+ List<Object> metricsSpec = Collections.emptyList();
+ if (querySpec.getQuery() instanceof GroupByQuery) {
+ // For group-by queries, the aggregators are transformed to their
combining factories in the dataschema, resulting
Review Comment:
Thanks for the comment!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]