This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 87fbe422187 "Partition boost" the group by queries in MSQ for better 
splits (#15474)
87fbe422187 is described below

commit 87fbe422187e6e7fa02983fbc70af95bc371e3f1
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Jan 11 12:46:27 2024 +0530

    "Partition boost" the group by queries in MSQ for better splits (#15474)
    
    "Partition boost" the group by queries in MSQ for better splits
---
 .../apache/druid/msq/querykit/QueryKitUtils.java   |  5 +-
 .../groupby/GroupByPostShuffleFrameProcessor.java  | 31 +++++--
 .../msq/querykit/groupby/GroupByQueryKit.java      | 99 ++++++++++++++++++----
 3 files changed, 112 insertions(+), 23 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
index 4791c33de6b..e5f2a0152fd 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
@@ -60,8 +60,9 @@ import java.util.stream.Collectors;
 public class QueryKitUtils
 {
   /**
-   * Field in frames that stores the partition "boosting" value. Typically 
used as the last element of a partitioning
-   * key when generating segments. This is an incrementing number that helps 
split up otherwise too-large partitions.
+   * Field in frames that stores the partition "boosting" value. Typically, it 
is used as the last element of a
+   * partitioning key when generating segments. This is an incrementing number 
that helps split up otherwise too-large
+   * partitions.
    */
   public static final String PARTITION_BOOST_COLUMN = "__boost";
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
index ed14dd52739..fb39118a59f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
@@ -30,6 +30,7 @@ import org.apache.druid.frame.processor.FrameProcessors;
 import org.apache.druid.frame.processor.FrameRowTooLargeException;
 import org.apache.druid.frame.processor.ReturnOrAwait;
 import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
 import org.apache.druid.frame.write.FrameWriter;
 import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.java.util.common.Unit;
@@ -52,6 +53,7 @@ import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -75,6 +77,8 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Object>
   @Nullable
   private final HavingSpec havingSpec;
 
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+
   private Cursor frameCursor = null;
   private Supplier<ResultRow> rowSupplierFromFrameCursor;
   private ResultRow outputRow = null;
@@ -99,8 +103,9 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Object>
     this.mergeFn = groupingEngine.createMergeFn(query);
     this.finalizeFn = makeFinalizeFn(query);
     this.havingSpec = cloneHavingSpec(query);
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
     this.columnSelectorFactoryForFrameWriter =
-        makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap(
+        makeVirtualColumnsForFrameWriter(partitionBoostVirtualColumn, 
jsonMapper, query).wrap(
             RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
                 query,
                 () -> outputRow,
@@ -233,6 +238,7 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Object>
     finalizeFn.accept(outputRow);
 
     if (frameWriter.addSelection()) {
+      incrementBoostColumn();
       outputRow = null;
       return false;
     } else if (frameWriter.getNumRows() > 0) {
@@ -240,6 +246,7 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Object>
       setUpFrameWriterIfNeeded();
 
       if (frameWriter.addSelection()) {
+        incrementBoostColumn();
         outputRow = null;
         return true;
       } else {
@@ -306,17 +313,29 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Object>
    * this processor. Kept in sync with the signature generated by {@link 
GroupByQueryKit}.
    */
   private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
       final ObjectMapper jsonMapper,
       final GroupByQuery query
   )
   {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
     final VirtualColumn segmentGranularityVirtualColumn =
         QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
-
-    if (segmentGranularityVirtualColumn == null) {
-      return VirtualColumns.EMPTY;
-    } else {
-      return 
VirtualColumns.create(Collections.singletonList(segmentGranularityVirtualColumn));
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
     }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  /**
+   * Increments the value of the partition boosting column. It should be 
called once the row value has been written
+   * to the frame
+   */
+  private void incrementBoostColumn()
+  {
+    
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 
1);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 469a8a8aa46..dca388b0338 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -99,15 +99,10 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
         QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
     final RowSignature intermediateSignature = 
computeIntermediateSignature(queryToRun);
     final ClusterBy resultClusterByWithoutGranularity = 
computeClusterByForResults(queryToRun);
-    final ClusterBy resultClusterBy =
+    final ClusterBy resultClusterByWithoutPartitionBoost =
         
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity,
 segmentGranularity);
-    final RowSignature resultSignature =
-        QueryKitUtils.sortableSignature(
-            
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(queryToRun),
 segmentGranularity),
-            resultClusterBy.getColumns()
-        );
     final ClusterBy intermediateClusterBy = 
computeIntermediateClusterBy(queryToRun);
-    final boolean doOrderBy = !resultClusterBy.equals(intermediateClusterBy);
+    final boolean doOrderBy = 
!resultClusterByWithoutPartitionBoost.equals(intermediateClusterBy);
     final boolean doLimitOrOffset =
         queryToRun.getLimitSpec() instanceof DefaultLimitSpec
         && (((DefaultLimitSpec) queryToRun.getLimitSpec()).isLimited()
@@ -115,23 +110,30 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
 
     final ShuffleSpecFactory shuffleSpecFactoryPreAggregation;
     final ShuffleSpecFactory shuffleSpecFactoryPostAggregation;
+    boolean partitionBoost;
 
-    // There can be a situation where intermediateClusterBy is empty, while 
the result is non-empty
-    // if we have PARTITIONED BY on anything except ALL, however we don't have 
a grouping dimension
-    // (i.e. no GROUP BY clause)
-    // __time in such queries is generated using either an aggregator (e.g. 
sum(metric) as __time) or using a
-    // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
-    if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
+    if (intermediateClusterBy.isEmpty() && 
resultClusterByWithoutPartitionBoost.isEmpty()) {
       // Ignore shuffleSpecFactory, since we know only a single partition will 
come out, and we can save some effort.
+      // This condition will be triggered when we don't have a grouping 
dimension, no partitioning granularity
+      // (PARTITIONED BY ALL) and no ordering/clustering dimensions
+      // For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY 
ALL
       shuffleSpecFactoryPreAggregation = 
ShuffleSpecFactories.singlePartition();
       shuffleSpecFactoryPostAggregation = 
ShuffleSpecFactories.singlePartition();
+      partitionBoost = false;
     } else if (doOrderBy) {
+      // There can be a situation where intermediateClusterBy is empty, while 
the resultClusterBy is non-empty
+      // if we have PARTITIONED BY on anything except ALL, however we don't 
have a grouping dimension
+      // (i.e. no GROUP BY clause)
+      // __time in such queries is generated using either an aggregator (e.g. 
sum(metric) as __time) or using a
+      // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
+      // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' 
AS __time FROM bar PARTITIONED BY DAY
       shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
                                          ? 
ShuffleSpecFactories.singlePartition()
                                          : 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
       shuffleSpecFactoryPostAggregation = doLimitOrOffset
                                           ? 
ShuffleSpecFactories.singlePartition()
                                           : resultShuffleSpecFactory;
+      partitionBoost = true;
     } else {
       shuffleSpecFactoryPreAggregation = doLimitOrOffset
                                          ? 
ShuffleSpecFactories.singlePartition()
@@ -139,6 +141,7 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
 
       // null: retain partitions from input (i.e. from preAggregation).
       shuffleSpecFactoryPostAggregation = null;
+      partitionBoost = false;
     }
 
     queryDefBuilder.add(
@@ -151,6 +154,18 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
                        .processorFactory(new 
GroupByPreShuffleFrameProcessorFactory(queryToRun))
     );
 
+    ClusterBy resultClusterBy = computeResultClusterBy(
+        queryToRun,
+        segmentGranularity,
+        partitionBoost
+    );
+    RowSignature resultSignature = computeResultSignature(
+        queryToRun,
+        segmentGranularity,
+        resultClusterBy,
+        partitionBoost
+    );
+
     queryDefBuilder.add(
         StageDefinition.builder(firstStageNumber + 1)
                        .inputs(new StageInputSpec(firstStageNumber))
@@ -188,7 +203,7 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
    * Intermediate signature of a particular {@link GroupByQuery}. Does not 
include post-aggregators, and all
    * aggregations are nonfinalized.
    */
-  static RowSignature computeIntermediateSignature(final GroupByQuery query)
+  private static RowSignature computeIntermediateSignature(final GroupByQuery 
query)
   {
     final RowSignature postAggregationSignature = 
query.getResultRowSignature(RowSignature.Finalization.NO);
     final RowSignature.Builder builder = RowSignature.builder();
@@ -207,13 +222,67 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
    * Result signature of a particular {@link GroupByQuery}. Includes 
post-aggregators, and aggregations are
    * finalized by default. (But may be nonfinalized, depending on {@link 
#isFinalize}.
    */
-  static RowSignature computeResultSignature(final GroupByQuery query)
+  private static RowSignature computeResultSignature(final GroupByQuery query)
   {
     final RowSignature.Finalization finalization =
         isFinalize(query) ? RowSignature.Finalization.YES : 
RowSignature.Finalization.NO;
     return query.getResultRowSignature(finalization);
   }
 
+  /**
+   * Computes the result clusterBy which may or may not have the partition 
boosted column, depending on the
+   * {@code partitionBoost} parameter passed
+   */
+  private static ClusterBy computeResultClusterBy(
+      final GroupByQuery query,
+      final Granularity segmentGranularity,
+      final boolean partitionBoost
+  )
+  {
+    final ClusterBy resultClusterByWithoutGranularity = 
computeClusterByForResults(query);
+    final ClusterBy resultClusterByWithoutPartitionBoost =
+        
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity,
 segmentGranularity);
+    if (!partitionBoost) {
+      return resultClusterByWithoutPartitionBoost;
+    }
+    List<KeyColumn> resultClusterByWithPartitionBoostColumns = new 
ArrayList<>(resultClusterByWithoutPartitionBoost.getColumns());
+    resultClusterByWithPartitionBoostColumns.add(new KeyColumn(
+        QueryKitUtils.PARTITION_BOOST_COLUMN,
+        KeyOrder.ASCENDING
+    ));
+    return new ClusterBy(
+        resultClusterByWithPartitionBoostColumns,
+        resultClusterByWithoutPartitionBoost.getBucketByCount()
+    );
+  }
+
+  /**
+   * Computes the result signature which may or may not have the partition 
boosted column depending on the
+   * {@code partitionBoost} passed. It expects that the clusterBy already has 
the partition boost column
+   * if the parameter {@code partitionBoost} is set as true.
+   */
+  private static RowSignature computeResultSignature(
+      final GroupByQuery query,
+      final Granularity segmentGranularity,
+      final ClusterBy resultClusterBy,
+      final boolean partitionBoost
+  )
+  {
+    final RowSignature resultSignatureWithoutPartitionBoost =
+        
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(query), 
segmentGranularity);
+
+    if (!partitionBoost) {
+      return 
QueryKitUtils.sortableSignature(resultSignatureWithoutPartitionBoost, 
resultClusterBy.getColumns());
+    }
+
+    final RowSignature resultSignatureWithPartitionBoost =
+        RowSignature.builder().addAll(resultSignatureWithoutPartitionBoost)
+                    .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG)
+                    .build();
+
+    return QueryKitUtils.sortableSignature(resultSignatureWithPartitionBoost, 
resultClusterBy.getColumns());
+  }
+
   /**
    * Whether aggregations appearing in the result of a query must be finalized.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to