This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 87fbe422187 "Partition boost" the group by queries in MSQ for better
splits (#15474)
87fbe422187 is described below
commit 87fbe422187e6e7fa02983fbc70af95bc371e3f1
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Jan 11 12:46:27 2024 +0530
"Partition boost" the group by queries in MSQ for better splits (#15474)
"Partition boost" the group by queries in MSQ for better splits
---
.../apache/druid/msq/querykit/QueryKitUtils.java | 5 +-
.../groupby/GroupByPostShuffleFrameProcessor.java | 31 +++++--
.../msq/querykit/groupby/GroupByQueryKit.java | 99 ++++++++++++++++++----
3 files changed, 112 insertions(+), 23 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
index 4791c33de6b..e5f2a0152fd 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
@@ -60,8 +60,9 @@ import java.util.stream.Collectors;
public class QueryKitUtils
{
/**
- * Field in frames that stores the partition "boosting" value. Typically
used as the last element of a partitioning
- * key when generating segments. This is an incrementing number that helps
split up otherwise too-large partitions.
+ * Field in frames that stores the partition "boosting" value. Typically, it
is used as the last element of a
+ * partitioning key when generating segments. This is an incrementing number
that helps split up otherwise too-large
+ * partitions.
*/
public static final String PARTITION_BOOST_COLUMN = "__boost";
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
index ed14dd52739..fb39118a59f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
@@ -30,6 +30,7 @@ import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
@@ -52,6 +53,7 @@ import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -75,6 +77,8 @@ public class GroupByPostShuffleFrameProcessor implements
FrameProcessor<Object>
@Nullable
private final HavingSpec havingSpec;
+ private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
@@ -99,8 +103,9 @@ public class GroupByPostShuffleFrameProcessor implements
FrameProcessor<Object>
this.mergeFn = groupingEngine.createMergeFn(query);
this.finalizeFn = makeFinalizeFn(query);
this.havingSpec = cloneHavingSpec(query);
+ this.partitionBoostVirtualColumn = new
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
this.columnSelectorFactoryForFrameWriter =
- makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap(
+ makeVirtualColumnsForFrameWriter(partitionBoostVirtualColumn,
jsonMapper, query).wrap(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
() -> outputRow,
@@ -233,6 +238,7 @@ public class GroupByPostShuffleFrameProcessor implements
FrameProcessor<Object>
finalizeFn.accept(outputRow);
if (frameWriter.addSelection()) {
+ incrementBoostColumn();
outputRow = null;
return false;
} else if (frameWriter.getNumRows() > 0) {
@@ -240,6 +246,7 @@ public class GroupByPostShuffleFrameProcessor implements
FrameProcessor<Object>
setUpFrameWriterIfNeeded();
if (frameWriter.addSelection()) {
+ incrementBoostColumn();
outputRow = null;
return true;
} else {
@@ -306,17 +313,29 @@ public class GroupByPostShuffleFrameProcessor implements
FrameProcessor<Object>
* this processor. Kept in sync with the signature generated by {@link
GroupByQueryKit}.
*/
private static VirtualColumns makeVirtualColumnsForFrameWriter(
+ @Nullable final VirtualColumn partitionBoostVirtualColumn,
final ObjectMapper jsonMapper,
final GroupByQuery query
)
{
+ List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+ virtualColumns.add(partitionBoostVirtualColumn);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
-
- if (segmentGranularityVirtualColumn == null) {
- return VirtualColumns.EMPTY;
- } else {
- return
VirtualColumns.create(Collections.singletonList(segmentGranularityVirtualColumn));
+ if (segmentGranularityVirtualColumn != null) {
+ virtualColumns.add(segmentGranularityVirtualColumn);
}
+
+ return VirtualColumns.create(virtualColumns);
+ }
+
+ /**
+ * Increments the value of the partition boosting column. It should be
called once the row value has been written
+ * to the frame
+ */
+ private void incrementBoostColumn()
+ {
+
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() +
1);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 469a8a8aa46..dca388b0338 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -99,15 +99,10 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper,
queryToRun.getContext());
final RowSignature intermediateSignature =
computeIntermediateSignature(queryToRun);
final ClusterBy resultClusterByWithoutGranularity =
computeClusterByForResults(queryToRun);
- final ClusterBy resultClusterBy =
+ final ClusterBy resultClusterByWithoutPartitionBoost =
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity,
segmentGranularity);
- final RowSignature resultSignature =
- QueryKitUtils.sortableSignature(
-
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(queryToRun),
segmentGranularity),
- resultClusterBy.getColumns()
- );
final ClusterBy intermediateClusterBy =
computeIntermediateClusterBy(queryToRun);
- final boolean doOrderBy = !resultClusterBy.equals(intermediateClusterBy);
+ final boolean doOrderBy =
!resultClusterByWithoutPartitionBoost.equals(intermediateClusterBy);
final boolean doLimitOrOffset =
queryToRun.getLimitSpec() instanceof DefaultLimitSpec
&& (((DefaultLimitSpec) queryToRun.getLimitSpec()).isLimited()
@@ -115,23 +110,30 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
final ShuffleSpecFactory shuffleSpecFactoryPreAggregation;
final ShuffleSpecFactory shuffleSpecFactoryPostAggregation;
+ boolean partitionBoost;
- // There can be a situation where intermediateClusterBy is empty, while
the result is non-empty
- // if we have PARTITIONED BY on anything except ALL, however we don't have
a grouping dimension
- // (i.e. no GROUP BY clause)
- // __time in such queries is generated using either an aggregator (e.g.
sum(metric) as __time) or using a
- // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
- if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
+ if (intermediateClusterBy.isEmpty() &&
resultClusterByWithoutPartitionBoost.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will
come out, and we can save some effort.
+ // This condition will be triggered when we don't have a grouping
dimension, no partitioning granularity
+ // (PARTITIONED BY ALL) and no ordering/clustering dimensions
+ // For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY
ALL
shuffleSpecFactoryPreAggregation =
ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation =
ShuffleSpecFactories.singlePartition();
+ partitionBoost = false;
} else if (doOrderBy) {
+ // There can be a situation where intermediateClusterBy is empty, while
the resultClusterBy is non-empty
+ // if we have PARTITIONED BY on anything except ALL, however we don't
have a grouping dimension
+ // (i.e. no GROUP BY clause)
+ // __time in such queries is generated using either an aggregator (e.g.
sum(metric) as __time) or using a
+ // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
+ // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01'
AS __time FROM bar PARTITIONED BY DAY
shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
?
ShuffleSpecFactories.singlePartition()
:
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset
?
ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory;
+ partitionBoost = true;
} else {
shuffleSpecFactoryPreAggregation = doLimitOrOffset
?
ShuffleSpecFactories.singlePartition()
@@ -139,6 +141,7 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
// null: retain partitions from input (i.e. from preAggregation).
shuffleSpecFactoryPostAggregation = null;
+ partitionBoost = false;
}
queryDefBuilder.add(
@@ -151,6 +154,18 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
.processorFactory(new
GroupByPreShuffleFrameProcessorFactory(queryToRun))
);
+ ClusterBy resultClusterBy = computeResultClusterBy(
+ queryToRun,
+ segmentGranularity,
+ partitionBoost
+ );
+ RowSignature resultSignature = computeResultSignature(
+ queryToRun,
+ segmentGranularity,
+ resultClusterBy,
+ partitionBoost
+ );
+
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
@@ -188,7 +203,7 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
* Intermediate signature of a particular {@link GroupByQuery}. Does not
include post-aggregators, and all
* aggregations are nonfinalized.
*/
- static RowSignature computeIntermediateSignature(final GroupByQuery query)
+ private static RowSignature computeIntermediateSignature(final GroupByQuery
query)
{
final RowSignature postAggregationSignature =
query.getResultRowSignature(RowSignature.Finalization.NO);
final RowSignature.Builder builder = RowSignature.builder();
@@ -207,13 +222,67 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
* Result signature of a particular {@link GroupByQuery}. Includes
post-aggregators, and aggregations are
* finalized by default. (But may be nonfinalized, depending on {@link
#isFinalize}.
*/
- static RowSignature computeResultSignature(final GroupByQuery query)
+ private static RowSignature computeResultSignature(final GroupByQuery query)
{
final RowSignature.Finalization finalization =
isFinalize(query) ? RowSignature.Finalization.YES :
RowSignature.Finalization.NO;
return query.getResultRowSignature(finalization);
}
+ /**
+ * Computes the result clusterBy which may or may not have the partition
boosted column, depending on the
+ * {@code partitionBoost} parameter passed
+ */
+ private static ClusterBy computeResultClusterBy(
+ final GroupByQuery query,
+ final Granularity segmentGranularity,
+ final boolean partitionBoost
+ )
+ {
+ final ClusterBy resultClusterByWithoutGranularity =
computeClusterByForResults(query);
+ final ClusterBy resultClusterByWithoutPartitionBoost =
+
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity,
segmentGranularity);
+ if (!partitionBoost) {
+ return resultClusterByWithoutPartitionBoost;
+ }
+ List<KeyColumn> resultClusterByWithPartitionBoostColumns = new
ArrayList<>(resultClusterByWithoutPartitionBoost.getColumns());
+ resultClusterByWithPartitionBoostColumns.add(new KeyColumn(
+ QueryKitUtils.PARTITION_BOOST_COLUMN,
+ KeyOrder.ASCENDING
+ ));
+ return new ClusterBy(
+ resultClusterByWithPartitionBoostColumns,
+ resultClusterByWithoutPartitionBoost.getBucketByCount()
+ );
+ }
+
+ /**
+ * Computes the result signature which may or may not have the partition
boosted column depending on the
+ * {@code partitionBoost} passed. It expects that the clusterBy already has
the partition boost column
+ * if the parameter {@code partitionBoost} is set as true.
+ */
+ private static RowSignature computeResultSignature(
+ final GroupByQuery query,
+ final Granularity segmentGranularity,
+ final ClusterBy resultClusterBy,
+ final boolean partitionBoost
+ )
+ {
+ final RowSignature resultSignatureWithoutPartitionBoost =
+
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(query),
segmentGranularity);
+
+ if (!partitionBoost) {
+ return
QueryKitUtils.sortableSignature(resultSignatureWithoutPartitionBoost,
resultClusterBy.getColumns());
+ }
+
+ final RowSignature resultSignatureWithPartitionBoost =
+ RowSignature.builder().addAll(resultSignatureWithoutPartitionBoost)
+ .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG)
+ .build();
+
+ return QueryKitUtils.sortableSignature(resultSignatureWithPartitionBoost,
resultClusterBy.getColumns());
+ }
+
/**
* Whether aggregations appearing in the result of a query must be finalized.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]