somu-imply commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1531502304
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition(
partitionBoost
);
- queryDefBuilder.add(
- StageDefinition.builder(firstStageNumber + 1)
- .inputs(new StageInputSpec(firstStageNumber))
- .signature(resultSignature)
- .maxWorkerCount(maxWorkerCount)
- .shuffleSpec(
- shuffleSpecFactoryPostAggregation != null
- ?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
- : null
- )
- .processorFactory(new
GroupByPostShuffleFrameProcessorFactory(queryToRun))
- );
+ // the result signature might change
+ // if window shufle spec is added
+ // say the output signature was d0, d1
+ // But shuffle spec for window was d1
+ // create the shufflespec from the column in the context
+ // and sort after wards to ensure prefix of shuffle is in row signature
+ final ShuffleSpec nextShuffleWindowSpec;
+ if
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL))
{
+ final ClusterBy windowClusterBy = (ClusterBy)
originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
+ nextShuffleWindowSpec = new HashShuffleSpec(
+ windowClusterBy,
+ maxWorkerCount
+ );
+ } else {
+ nextShuffleWindowSpec = null;
+ }
+ final ShuffleSpec stageShuffleSpec;
+ if (shuffleSpecFactoryPostAggregation != null) {
+ List<KeyColumn> columns = resultClusterBy.getColumns();
+ if (nextShuffleWindowSpec != null) {
+ columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns());
+ // Creating a new cluster by with the columns from existing
+ // plus the columns from the next window partition column
+ final ClusterBy tmp = new ClusterBy(columns,
resultClusterBy.getBucketByCount());
+ stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(tmp, false);
+ } else {
+ stageShuffleSpec =
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false);
+ }
+ } else {
+ stageShuffleSpec = nextShuffleWindowSpec;
+ }
+ final RowSignature stageSignature;
+ if (stageShuffleSpec == null) {
+ stageSignature = resultSignature;
+ } else {
+ // sort the signature to make sure the prefix is aligned
+ stageSignature = QueryKitUtils.sortableSignature(
+ resultSignature,
+ stageShuffleSpec.clusterBy().getColumns()
+ );
+ }
if (doLimitOrOffset) {
+ queryDefBuilder.add(
+ StageDefinition.builder(firstStageNumber + 1)
+ .inputs(new StageInputSpec(firstStageNumber))
+ .signature(resultSignature)
+ .maxWorkerCount(maxWorkerCount)
+ .shuffleSpec(
+ shuffleSpecFactoryPostAggregation != null
+ ?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
+ : null
+ )
+ .processorFactory(new
GroupByPostShuffleFrameProcessorFactory(queryToRun))
+ );
final DefaultLimitSpec limitSpec = (DefaultLimitSpec)
queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
- .shuffleSpec(null) // no shuffling should be required
after a limit processor.
+ // no shuffling should be required after a limit
processor.
+ // but need one if the next stage is a window with a
partition by
+ .shuffleSpec(nextShuffleWindowSpec)
Review Comment:
Refactored query kit to make sure the old code runs when the next window
shuffle soec in null else go through the new code
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition(
partitionBoost
);
- queryDefBuilder.add(
- StageDefinition.builder(firstStageNumber + 1)
- .inputs(new StageInputSpec(firstStageNumber))
- .signature(resultSignature)
- .maxWorkerCount(maxWorkerCount)
- .shuffleSpec(
- shuffleSpecFactoryPostAggregation != null
- ?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
- : null
- )
- .processorFactory(new
GroupByPostShuffleFrameProcessorFactory(queryToRun))
- );
+ // the result signature might change
+ // if window shufle spec is added
+ // say the output signature was d0, d1
+ // But shuffle spec for window was d1
+ // create the shufflespec from the column in the context
+ // and sort after wards to ensure prefix of shuffle is in row signature
+ final ShuffleSpec nextShuffleWindowSpec;
+ if
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL))
{
+ final ClusterBy windowClusterBy = (ClusterBy)
originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
+ nextShuffleWindowSpec = new HashShuffleSpec(
+ windowClusterBy,
+ maxWorkerCount
+ );
+ } else {
+ nextShuffleWindowSpec = null;
+ }
+ final ShuffleSpec stageShuffleSpec;
+ if (shuffleSpecFactoryPostAggregation != null) {
+ List<KeyColumn> columns = resultClusterBy.getColumns();
+ if (nextShuffleWindowSpec != null) {
+ columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns());
+ // Creating a new cluster by with the columns from existing
+ // plus the columns from the next window partition column
+ final ClusterBy tmp = new ClusterBy(columns,
resultClusterBy.getBucketByCount());
+ stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(tmp, false);
+ } else {
+ stageShuffleSpec =
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false);
+ }
+ } else {
+ stageShuffleSpec = nextShuffleWindowSpec;
+ }
+ final RowSignature stageSignature;
+ if (stageShuffleSpec == null) {
+ stageSignature = resultSignature;
+ } else {
+ // sort the signature to make sure the prefix is aligned
+ stageSignature = QueryKitUtils.sortableSignature(
+ resultSignature,
+ stageShuffleSpec.clusterBy().getColumns()
+ );
+ }
if (doLimitOrOffset) {
+ queryDefBuilder.add(
+ StageDefinition.builder(firstStageNumber + 1)
+ .inputs(new StageInputSpec(firstStageNumber))
+ .signature(resultSignature)
+ .maxWorkerCount(maxWorkerCount)
+ .shuffleSpec(
+ shuffleSpecFactoryPostAggregation != null
+ ?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
+ : null
+ )
+ .processorFactory(new
GroupByPostShuffleFrameProcessorFactory(queryToRun))
+ );
final DefaultLimitSpec limitSpec = (DefaultLimitSpec)
queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
- .shuffleSpec(null) // no shuffling should be required
after a limit processor.
+ // no shuffling should be required after a limit
processor.
+ // but need one if the next stage is a window with a
partition by
+ .shuffleSpec(nextShuffleWindowSpec)
Review Comment:
Refactored query kit to make sure the old code runs when the next window
shuffle spec in null else go through the new code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]