kfaraz commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1689857162


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -610,7 +612,7 @@ private static DataSchema createDataSchema(
     // Check index metadata & decide which values to propagate (i.e. carry 
over) for rollup & queryGranularity
     final ExistingSegmentAnalyzer existingSegmentAnalyzer = new 
ExistingSegmentAnalyzer(
         segments,
-        granularitySpec.isRollup() == null,
+        true, // Always need rollup to check if there are some rollup segments 
already present.

Review Comment:
   Shouldn't it be always true for MSQ engine?
   ```suggestion
           granularitySpec.isRollup() || engine == CompactionEngine.MSQ,
   ```
   
   If it must always be true for native engine too, then let's just remove this 
argument from the constructor as the constructor is not used anywhere else.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1624,33 +1629,49 @@ private static Function<Set<DataSegment>, 
Set<DataSegment>> addCompactionStateTo
       MSQSpec querySpec,
       ObjectMapper jsonMapper,
       DataSchema dataSchema,
-      ShardSpec shardSpec,
+      @Nullable ShardSpec shardSpec,
+      @Nullable ClusterBy clusterBy,
       String queryId
   )
   {
     final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
     PartitionsSpec partitionSpec;
 
-    if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
-      List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+    // shardSpec is absent in the absence of segments, which happens when only 
tombstones are generated by an
+    // MSQControllerTask.
+    if (shardSpec != null) {
+      if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
+        List<String> partitionDimensions = ((DimensionRangeShardSpec) 
shardSpec).getDimensions();
+        partitionSpec = new DimensionRangePartitionsSpec(
+            tuningConfig.getRowsPerSegment(),
+            null,
+            partitionDimensions,
+            false
+        );
+      } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) 
{
+        // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
+        partitionSpec = new 
DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
+      } else {
+        // SingleDimenionShardSpec and other shard specs are never created in 
MSQ.
+        throw new MSQException(
+            UnknownFault.forMessage(
+                StringUtils.format(
+                    "Query[%s] cannot store compaction state in segments as 
shard spec of unsupported type[%s].",
+                    queryId,
+                    shardSpec.getType()
+                )));
+      }
+    } else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {

Review Comment:
   Nit: Can `clusterBy.getColumns()` ever return `null`?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -139,13 +144,58 @@ public CompactionConfigValidationResult 
validateCompactionTask(
       ));
     }
     
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-    
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
+    validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
                             .orElse(new CompactionConfigValidationResult(true, 
null));
   }
 
+  /**
+   * Valides that there are no rolled-up segments where either:
+   * <ul>
+   * <li>aggregator factory differs from its combining factory </li>
+   * <li>input col name is different from the output name (non-idempotent)</li>
+   * </ul>
+   */
+  private CompactionConfigValidationResult 
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
+  {
+    for (Map.Entry<Interval, DataSchema> intervalDataSchema : 
intervalToDataSchemaMap.entrySet()) {
+      if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
+        CombinedDataSchema combinedDataSchema = (CombinedDataSchema) 
intervalDataSchema.getValue();
+        if (Boolean.TRUE.equals(combinedDataSchema.hasRolledUpSegments())) {

Review Comment:
   If we make the change to use a primitive boolean in `CombinedDataSchema`, we 
should also do this:
   ```suggestion
           if (combinedDataSchema.hasRolledUpSegments()) {
   ```



##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema
+{
+  private final Boolean hasRolledUpSegments;
+
+  public CombinedDataSchema(
+      String dataSource,
+      @Nullable TimestampSpec timestampSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      AggregatorFactory[] aggregators,
+      GranularitySpec granularitySpec,
+      TransformSpec transformSpec,
+      @Nullable Map<String, Object> parserMap,
+      ObjectMapper objectMapper,
+      @Nullable Boolean hasRolledUpSegments

Review Comment:
   I think this can be a primitive `boolean` as it is always non-null.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>, 
Set<DataSegment>> addCompactionStateTo
                                         : new 
ClientCompactionTaskTransformSpec(
                                             
dataSchema.getTransformSpec().getFilter()
                                         ).asMap(jsonMapper);
-    List<Object> metricsSpec = dataSchema.getAggregators() == null
-                               ? null
-                               : jsonMapper.convertValue(
-                                   dataSchema.getAggregators(),
-                                   new TypeReference<List<Object>>() {}
-                               );
+    List<Object> metricsSpec = Collections.emptyList();
 
+    if (querySpec.getQuery() instanceof GroupByQuery) {
+      // For group-by queries, the aggregators are transformed to their 
combining factories in the dataschema, resulting
+      // in a mismatch between schema in compaction spec and the one in 
compaction state. Sourcing the metricsSpec
+      // therefore directly from the querySpec.
+      GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
+      metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(), 
new TypeReference<List<Object>>() {});

Review Comment:
   Why convert? `groupByQuery.getAggregatorSpecs()` already returns a `List`.



##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema

Review Comment:
   Please add a short javadoc here describing the purpose and usage of this 
class.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1553,7 +1557,7 @@ private void handleQueryResults(
                                                      
Tasks.DEFAULT_STORE_COMPACTION_STATE
                                                  );
 
-      if (!segments.isEmpty() && storeCompactionState) {

Review Comment:
   Nit: Why don't we need to check for empty anymore?



##########
server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class CombinedDataSchema extends DataSchema
+{
+  private final Boolean hasRolledUpSegments;
+
+  public CombinedDataSchema(
+      String dataSource,
+      @Nullable TimestampSpec timestampSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      AggregatorFactory[] aggregators,
+      GranularitySpec granularitySpec,
+      TransformSpec transformSpec,
+      @Nullable Map<String, Object> parserMap,
+      ObjectMapper objectMapper,

Review Comment:
   These 2 args are always passed as null, please remove them from the 
constructor.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -815,6 +821,11 @@ public Boolean getRollup()
       return rollup;
     }
 
+    public Boolean hasRolledUpSegments()

Review Comment:
   Please return a primitive `boolean` here to avoid confusion of nullability.
   ```suggestion
       public boolean hasRolledUpSegments()
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>, 
Set<DataSegment>> addCompactionStateTo
                                         : new 
ClientCompactionTaskTransformSpec(
                                             
dataSchema.getTransformSpec().getFilter()
                                         ).asMap(jsonMapper);
-    List<Object> metricsSpec = dataSchema.getAggregators() == null
-                               ? null
-                               : jsonMapper.convertValue(
-                                   dataSchema.getAggregators(),
-                                   new TypeReference<List<Object>>() {}
-                               );
+    List<Object> metricsSpec = Collections.emptyList();
 
+    if (querySpec.getQuery() instanceof GroupByQuery) {
+      // For group-by queries, the aggregators are transformed to their 
combining factories in the dataschema, resulting

Review Comment:
   Thanks for the comment!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to