kgyrtkirk commented on code in PR #16804:
URL: https://github.com/apache/druid/pull/16804#discussion_r1698245372
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -328,4 +321,29 @@ private ShuffleSpec
findShuffleSpecForNextWindow(List<OperatorFactory> operatorF
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0),
maxWorkerCount);
}
+
+ /**
+ * Override the shuffle spec of the last stage based on the shuffling
required by the first window stage.
+ * @param queryId
+ * @param dataSourcePlan
+ * @param shuffleSpec
+ * @return
+ */
+ private QueryDefinitionBuilder makeQueryDefinitionBuilder(String queryId,
DataSourcePlan dataSourcePlan, ShuffleSpec shuffleSpec)
+ {
+ final QueryDefinitionBuilder queryDefBuilder =
QueryDefinition.builder(queryId);
+ int previousStageNumber =
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getStageNumber();
+ for (final StageDefinition stageDef :
dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) {
+ if (stageDef.getStageNumber() == previousStageNumber) {
+ RowSignature rowSignature = QueryKitUtils.sortableSignature(
+ stageDef.getSignature(),
+ shuffleSpec.clusterBy().getColumns()
+ );
+
queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature));
Review Comment:
to be 100% sure that we are not causing correctness issues; can we validate
if we are ok to override the old `shuffleSpec` ? if its not null or something
we could allow we should preferably throw an Exception ( or if that's not
really possibly the safest would be to add a new dummy stage which just
re-shuffles?)
I guess the cases when its not safe to do so may need further
investigation(s) - as those shuffles could possibly be moved "after" the window
query....
I see the cases of `clusterBy` as something which should probably
wrap-around the full built query regardless if its `Scan` / `GroupBy` /
`Window` / `"anything"` ; but that could be a refactor of its own - which may
close that gap for window queries as well.
note: I think if we don't handle `clusterBy` in this class then writing a
windowed query to files might lead to unexpected results; but I guess that's
not really a problem right now :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]