This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5333c53d710 Support non time order in MSQ compaction (#17318)
5333c53d710 is described below

commit 5333c53d710f0238b5314b7508d6a7877233a33b
Author: Vishesh Garg <[email protected]>
AuthorDate: Wed Nov 27 13:26:10 2024 +0530

    Support non time order in MSQ compaction (#17318)
    
    This patch supports sorting segments by non-time columns (added in #16849) 
to MSQ compaction.
    Specifically, if `forceSegmentSortByTime` is set in the data schema, either 
via the user-supplied
    compaction config or in the inferred schema, the following steps are taken:
    - Skip adding `__time` explicitly as the first column to the dimension 
schema since it already comes
    as part of the schema
    - Ensure column mappings propagate `__time` in the order specified by the 
schema
    - Set `forceSegmentSortByTime` in the MSQ context.
---
 .../druid/msq/indexing/MSQCompactionRunner.java    |  62 +++++--
 .../msq/indexing/MSQCompactionRunnerTest.java      | 205 ++++++++++++++-------
 2 files changed, 176 insertions(+), 91 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index d05ab12ea3f..2d8a510fd98 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -348,7 +348,10 @@ public class MSQCompactionRunner implements 
CompactionRunner
   private static RowSignature getRowSignature(DataSchema dataSchema)
   {
     RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
-    
rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(), 
ColumnType.LONG);
+    if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime() == true) {
+      // If sort not forced by time, __time appears as part of dimensions in 
DimensionsSpec
+      
rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(), 
ColumnType.LONG);
+    }
     if (!isQueryGranularityEmptyOrNone(dataSchema)) {
       // A virtual column for query granularity would have been added. Add 
corresponding column type.
       rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
@@ -398,25 +401,31 @@ public class MSQCompactionRunner implements 
CompactionRunner
 
   private static ColumnMappings getColumnMappings(DataSchema dataSchema)
   {
-    List<ColumnMapping> columnMappings = dataSchema.getDimensionsSpec()
-                                                   .getDimensions()
-                                                   .stream()
-                                                   .map(dim -> new 
ColumnMapping(
-                                                       dim.getName(), 
dim.getName()))
-                                                   
.collect(Collectors.toList());
+    List<ColumnMapping> columnMappings = new ArrayList<>();
+    // For scan queries, a virtual column is created from __time if a custom 
query granularity is provided. For
+    // group-by queries, as insert needs __time, it will always be one of the 
dimensions. Since dimensions in groupby
+    // aren't allowed to have time column as the output name, we map time 
dimension to TIME_VIRTUAL_COLUMN in
+    // dimensions, and map it back to the time column here.
+    String timeColumn = (isGroupBy(dataSchema) || 
!isQueryGranularityEmptyOrNone(dataSchema))
+                        ? TIME_VIRTUAL_COLUMN
+                        : ColumnHolder.TIME_COLUMN_NAME;
+    ColumnMapping timeColumnMapping = new ColumnMapping(timeColumn, 
ColumnHolder.TIME_COLUMN_NAME);
+    if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) {
+      // When not sorted by time, the __time column is missing from 
dimensionsSpec
+      columnMappings.add(timeColumnMapping);
+    }
+    columnMappings.addAll(
+        dataSchema.getDimensionsSpec()
+                  .getDimensions()
+                  .stream()
+                  .map(dim -> 
dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME)
+                              ? timeColumnMapping
+                              : new ColumnMapping(dim.getName(), 
dim.getName()))
+                  .collect(Collectors.toList())
+    );
     columnMappings.addAll(Arrays.stream(dataSchema.getAggregators())
                                 .map(agg -> new ColumnMapping(agg.getName(), 
agg.getName()))
-                                .collect(
-                                    Collectors.toList()));
-    if (isGroupBy(dataSchema) || !isQueryGranularityEmptyOrNone(dataSchema)) {
-      // For scan queries, a virtual column is created from __time if a custom 
query granularity is provided. For
-      // group-by queries, as insert needs __time, it will always be one of 
the dimensions. Since dimensions in groupby
-      // aren't allowed to have time column as the output name, we map time 
dimension to TIME_VIRTUAL_COLUMN in
-      // dimensions, and map it back to the time column here.
-      columnMappings.add(new ColumnMapping(TIME_VIRTUAL_COLUMN, 
ColumnHolder.TIME_COLUMN_NAME));
-    } else {
-      columnMappings.add(new ColumnMapping(ColumnHolder.TIME_COLUMN_NAME, 
ColumnHolder.TIME_COLUMN_NAME));
-    }
+                                .collect(Collectors.toList()));
     return new ColumnMappings(columnMappings);
   }
 
@@ -431,6 +440,19 @@ public class MSQCompactionRunner implements 
CompactionRunner
     return Collections.emptyList();
   }
 
+  private static Map<String, Object> buildQueryContext(
+      Map<String, Object> taskContext,
+      DataSchema dataSchema
+  )
+  {
+    if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) {
+      return taskContext;
+    }
+    Map<String, Object> queryContext = new HashMap<>(taskContext);
+    queryContext.put(MultiStageQueryContext.CTX_FORCE_TIME_SORT, false);
+    return queryContext;
+  }
+
   private static Query<?> buildScanQuery(
       CompactionTask compactionTask,
       Interval interval,
@@ -447,7 +469,7 @@ public class MSQCompactionRunner implements CompactionRunner
         .columnTypes(rowSignature.getColumnTypes())
         .intervals(new 
MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
         .filters(dataSchema.getTransformSpec().getFilter())
-        .context(compactionTask.getContext());
+        .context(buildQueryContext(compactionTask.getContext(), dataSchema));
 
     if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
       List<OrderByColumnSpec> orderByColumnSpecs = 
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
@@ -599,7 +621,7 @@ public class MSQCompactionRunner implements CompactionRunner
         .setDimensions(getAggregateDimensions(dataSchema, 
inputColToVirtualCol))
         .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators()))
         .setPostAggregatorSpecs(postAggregators)
-        .setContext(compactionTask.getContext())
+        .setContext(buildQueryContext(compactionTask.getContext(), dataSchema))
         .setInterval(interval);
 
     if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 0a54f8550a9..a05ccd499d0 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -60,7 +60,9 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.AutoTypeColumnSchema;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.NestedDataColumnSchema;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.CompressionFactory;
@@ -72,14 +74,13 @@ import 
org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.hamcrest.MatcherAssert;
-import org.hamcrest.Matchers;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -103,10 +104,14 @@ public class MSQCompactionRunnerTest
   private static final StringDimensionSchema STRING_DIMENSION = new 
StringDimensionSchema("string_dim", null, false);
   private static final StringDimensionSchema MV_STRING_DIMENSION = new 
StringDimensionSchema("mv_string_dim", null, null);
   private static final LongDimensionSchema LONG_DIMENSION = new 
LongDimensionSchema("long_dim");
+  private static final NestedDataColumnSchema NESTED_DIMENSION = new 
NestedDataColumnSchema("nested_dim", 4);
+  private static final AutoTypeColumnSchema AUTO_DIMENSION = new 
AutoTypeColumnSchema("auto_dim", null);
   private static final List<DimensionSchema> DIMENSIONS = ImmutableList.of(
       STRING_DIMENSION,
       LONG_DIMENSION,
-      MV_STRING_DIMENSION
+      MV_STRING_DIMENSION,
+      NESTED_DIMENSION,
+      AUTO_DIMENSION
   );
   private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = 
ImmutableMap.of(
       COMPACTION_INTERVAL,
@@ -336,7 +341,7 @@ public class MSQCompactionRunnerTest
   }
 
   @Test
-  public void testMSQControllerTaskSpecWithScanIsValid() throws 
JsonProcessingException
+  public void testCompactionConfigWithoutMetricsSpecProducesCorrectSpec() 
throws JsonProcessingException
   {
     DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);
 
@@ -357,7 +362,7 @@ public class MSQCompactionRunnerTest
                   .withGranularity(
                       new UniformGranularitySpec(
                           SEGMENT_GRANULARITY.getDefaultGranularity(),
-                          null,
+                          QUERY_GRANULARITY.getDefaultGranularity(),
                           false,
                           Collections.singletonList(COMPACTION_INTERVAL)
                       )
@@ -375,37 +380,37 @@ public class MSQCompactionRunnerTest
 
     MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec();
 
-    Assert.assertEquals(
-        new MSQTuningConfig(
-            1,
-            MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
-            MAX_ROWS_PER_SEGMENT,
-            null,
-            createIndexSpec()
-        ),
-        actualMSQSpec.getTuningConfig()
-    );
-    Assert.assertEquals(
-        new DataSourceMSQDestination(
-            DATA_SOURCE,
-            SEGMENT_GRANULARITY.getDefaultGranularity(),
-            null,
-            Collections.singletonList(COMPACTION_INTERVAL),
-            
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity())),
-            null
-        ),
-        actualMSQSpec.getDestination()
-    );
+    Assert.assertEquals(getExpectedTuningConfig(), 
actualMSQSpec.getTuningConfig());
+    Assert.assertEquals(getExpectedDestination(), 
actualMSQSpec.getDestination());
 
     Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery);
     ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery();
 
+    List<String> expectedColumns = new ArrayList<>();
+    List<ColumnType> expectedColumnTypes = new ArrayList<>();
+    // Add __time since this is a time-ordered query which doesn't have __time 
explicitly defined in dimensionsSpec
+    expectedColumns.add(ColumnHolder.TIME_COLUMN_NAME);
+    expectedColumnTypes.add(ColumnType.LONG);
+
+    // Add TIME_VIRTUAL_COLUMN since a query granularity is specified
+    expectedColumns.add(MSQCompactionRunner.TIME_VIRTUAL_COLUMN);
+    expectedColumnTypes.add(ColumnType.LONG);
+
+    
expectedColumns.addAll(DIMENSIONS.stream().map(DimensionSchema::getName).collect(Collectors.toList()));
+    
expectedColumnTypes.addAll(DIMENSIONS.stream().map(DimensionSchema::getColumnType).collect(Collectors.toList()));
+
+    Assert.assertEquals(expectedColumns, scanQuery.getColumns());
+    Assert.assertEquals(expectedColumnTypes, scanQuery.getColumnTypes());
+
     Assert.assertEquals(dimFilter, scanQuery.getFilter());
     Assert.assertEquals(
         JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()),
         
msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
     );
-    
Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY));
+    Assert.assertEquals(
+        JSON_MAPPER.writeValueAsString(QUERY_GRANULARITY.toString()),
+        
msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY)
+    );
     Assert.assertEquals(WorkerAssignmentStrategy.MAX, 
actualMSQSpec.getAssignmentStrategy());
     Assert.assertEquals(
         
PARTITION_DIMENSIONS.stream().map(OrderBy::ascending).collect(Collectors.toList()),
@@ -414,7 +419,60 @@ public class MSQCompactionRunnerTest
   }
 
   @Test
-  public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws 
JsonProcessingException
+  public void 
testCompactionConfigWithSortOnNonTimeDimensionsProducesCorrectSpec() throws 
JsonProcessingException
+  {
+    List<DimensionSchema> nonTimeSortedDimensions = ImmutableList.of(
+        STRING_DIMENSION,
+        new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME),
+        LONG_DIMENSION
+    );
+    CompactionTask taskCreatedWithTransformSpec = createCompactionTask(
+        new DynamicPartitionsSpec(TARGET_ROWS_PER_SEGMENT, null),
+        null,
+        Collections.emptyMap(),
+        null,
+        null
+    );
+
+    // Set forceSegmentSortByTime=false to enable non-time order
+    DimensionsSpec dimensionsSpec = DimensionsSpec.builder()
+                                                  
.setDimensions(nonTimeSortedDimensions)
+                                                  
.setForceSegmentSortByTime(false)
+                                                  .build();
+    DataSchema dataSchema =
+        DataSchema.builder()
+                  .withDataSource(DATA_SOURCE)
+                  .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, 
null))
+                  .withDimensions(dimensionsSpec)
+                  .withGranularity(
+                      new UniformGranularitySpec(
+                          SEGMENT_GRANULARITY.getDefaultGranularity(),
+                          null,
+                          false,
+                          Collections.singletonList(COMPACTION_INTERVAL)
+                      )
+                  )
+                  .build();
+
+    List<MSQControllerTask> msqControllerTasks = 
MSQ_COMPACTION_RUNNER.createMsqControllerTasks(
+        taskCreatedWithTransformSpec,
+        Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
+    );
+
+    MSQSpec actualMSQSpec = 
Iterables.getOnlyElement(msqControllerTasks).getQuerySpec();
+
+    Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery);
+    ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery();
+
+    // Dimensions should already list __time and the order should remain intact
+    Assert.assertEquals(
+        
nonTimeSortedDimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList()),
+        scanQuery.getColumns()
+    );
+  }
+
+  @Test
+  public void testCompactionConfigWithMetricsSpecProducesCorrectSpec() throws 
JsonProcessingException
   {
     DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);
 
@@ -444,7 +502,6 @@ public class MSQCompactionRunnerTest
         multiValuedDimensions
     );
 
-
     List<MSQControllerTask> msqControllerTasks = 
MSQ_COMPACTION_RUNNER.createMsqControllerTasks(
         taskCreatedWithTransformSpec,
         Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
@@ -454,27 +511,8 @@ public class MSQCompactionRunnerTest
 
     MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec();
 
-    Assert.assertEquals(
-        new MSQTuningConfig(
-            1,
-            MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
-            MAX_ROWS_PER_SEGMENT,
-            null,
-            createIndexSpec()
-        ),
-        actualMSQSpec.getTuningConfig()
-    );
-    Assert.assertEquals(
-        new DataSourceMSQDestination(
-            DATA_SOURCE,
-            SEGMENT_GRANULARITY.getDefaultGranularity(),
-            null,
-            Collections.singletonList(COMPACTION_INTERVAL),
-            
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity())),
-            null
-        ),
-        actualMSQSpec.getDestination()
-    );
+    Assert.assertEquals(getExpectedTuningConfig(), 
actualMSQSpec.getTuningConfig());
+    Assert.assertEquals(getExpectedDestination(), 
actualMSQSpec.getDestination());
 
     Assert.assertTrue(actualMSQSpec.getQuery() instanceof GroupByQuery);
     GroupByQuery groupByQuery = (GroupByQuery) actualMSQSpec.getQuery();
@@ -490,30 +528,32 @@ public class MSQCompactionRunnerTest
     );
     Assert.assertEquals(WorkerAssignmentStrategy.MAX, 
actualMSQSpec.getAssignmentStrategy());
 
-
-    // Since only MV_STRING_DIMENSION is indicated to be MVD by the 
CombinedSchema, conversion to array should happen
-    // only for that column.
-    List<DimensionSpec> expectedDimensionSpec = DIMENSIONS.stream()
-                                                          .filter(dim -> 
!MV_STRING_DIMENSION.getName()
-                                                                               
             .equals(dim.getName()))
-                                                          .map(dim -> new 
DefaultDimensionSpec(
-                                                              dim.getName(),
-                                                              dim.getName(),
-                                                              
dim.getColumnType()
-                                                          ))
-                                                          .collect(
-                                                              
Collectors.toList());
+    List<DimensionSpec> expectedDimensionSpec = new ArrayList<>();
     expectedDimensionSpec.add(
-        new DefaultDimensionSpec(MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
-                                                       
MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
-                                                       ColumnType.LONG)
+        new DefaultDimensionSpec(
+            MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
+            MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
+            ColumnType.LONG
+        )
     );
     String mvToArrayStringDim = 
MSQCompactionRunner.ARRAY_VIRTUAL_COLUMN_PREFIX + MV_STRING_DIMENSION.getName();
-    expectedDimensionSpec.add(new DefaultDimensionSpec(mvToArrayStringDim, 
mvToArrayStringDim, ColumnType.STRING_ARRAY));
-    MatcherAssert.assertThat(
-        expectedDimensionSpec,
-        Matchers.containsInAnyOrder(groupByQuery.getDimensions().toArray(new 
DimensionSpec[0]))
-    );
+    // Since only MV_STRING_DIMENSION is indicated to be MVD by the 
CombinedSchema, conversion to array should happen
+    // only for that column.
+    expectedDimensionSpec.addAll(DIMENSIONS.stream()
+                                           .map(dim ->
+                                                    
MV_STRING_DIMENSION.getName().equals(dim.getName())
+                                                    ? new DefaultDimensionSpec(
+                                                        mvToArrayStringDim,
+                                                        mvToArrayStringDim,
+                                                        ColumnType.STRING_ARRAY
+                                                    )
+                                                    : new DefaultDimensionSpec(
+                                                        dim.getName(),
+                                                        dim.getName(),
+                                                        dim.getColumnType()
+                                                    ))
+                                           .collect(Collectors.toList()));
+    Assert.assertEquals(expectedDimensionSpec, groupByQuery.getDimensions());
   }
 
   private CompactionTask createCompactionTask(
@@ -580,4 +620,27 @@ public class MSQCompactionRunnerTest
                     
.withLongEncoding(CompressionFactory.LongEncodingStrategy.LONGS)
                     .build();
   }
+
+  private static DataSourceMSQDestination getExpectedDestination()
+  {
+    return new DataSourceMSQDestination(
+        DATA_SOURCE,
+        SEGMENT_GRANULARITY.getDefaultGranularity(),
+        null,
+        Collections.singletonList(COMPACTION_INTERVAL),
+        DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity())),
+        null
+    );
+  }
+
+  private static MSQTuningConfig getExpectedTuningConfig()
+  {
+    return new MSQTuningConfig(
+        1,
+        MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
+        MAX_ROWS_PER_SEGMENT,
+        null,
+        createIndexSpec()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to