somu-imply commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1531504063
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition(
partitionBoost
);
- queryDefBuilder.add(
- StageDefinition.builder(firstStageNumber + 1)
- .inputs(new StageInputSpec(firstStageNumber))
- .signature(resultSignature)
- .maxWorkerCount(maxWorkerCount)
- .shuffleSpec(
- shuffleSpecFactoryPostAggregation != null
- ?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
- : null
- )
- .processorFactory(new
GroupByPostShuffleFrameProcessorFactory(queryToRun))
- );
+ // the result signature might change
+ // if window shufle spec is added
+ // say the output signature was d0, d1
+ // But shuffle spec for window was d1
+ // create the shufflespec from the column in the context
+ // and sort after wards to ensure prefix of shuffle is in row signature
+ final ShuffleSpec nextShuffleWindowSpec;
+ if
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL))
{
+ final ClusterBy windowClusterBy = (ClusterBy)
originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
+ nextShuffleWindowSpec = new HashShuffleSpec(
+ windowClusterBy,
+ maxWorkerCount
+ );
+ } else {
+ nextShuffleWindowSpec = null;
Review Comment:
Yes refactored in that way
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]