Akshat-Jain commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1675932377
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -125,28 +139,53 @@ public QueryDefinition makeQueryDefinition(
queryToRun.getOperators(),
rowSignature,
true,
- maxRowsMaterialized
+ maxRowsMaterialized,
+ new ArrayList<>()
))
);
} else {
- // there are multiple windows present in the query
- // Create stages for each window in the query
- // These stages will be serialized
- // the partition by clause of the next window will be the shuffle key
for the previous window
+ // There are multiple windows present in the query.
+ // Create stages for each window in the query.
+ // These stages will be serialized.
+ // The partition by clause of the next window will be the shuffle key
for the previous window.
RowSignature.Builder bob = RowSignature.builder();
- final int numberOfWindows = operatorList.size();
- final int baseSize = rowSignature.size() - numberOfWindows;
- for (int i = 0; i < baseSize; i++) {
- bob.add(rowSignature.getColumnName(i),
rowSignature.getColumnType(i).get());
+ RowSignature signatureFromInput =
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+ log.info("Row signature received from last stage is [%s].",
signatureFromInput);
+
+ for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+ bob.add(signatureFromInput.getColumnName(i),
signatureFromInput.getColumnType(i).get());
}
- for (int i = 0; i < numberOfWindows; i++) {
- bob.add(rowSignature.getColumnName(baseSize + i),
rowSignature.getColumnType(baseSize + i).get()).build();
+ List<String> partitionColumnNames = new ArrayList<>();
+
+ /*
+ operatorList is a List<List<OperatorFactory>>, where each
List<OperatorFactory> corresponds to the operator factories
+ to be used for a different window stage.
+
+ We iterate over operatorList, and add the definition for a window stage
to QueryDefinitionBuilder.
+ */
+ for (int i = 0; i < operatorList.size(); i++) {
+ for (OperatorFactory operatorFactory : operatorList.get(i)) {
+ if (operatorFactory instanceof WindowOperatorFactory) {
+ List<String> outputColumnNames = ((WindowOperatorFactory)
operatorFactory).getProcessor().getOutputColumnNames();
+
+ // Need to add column names which are present in outputColumnNames
and rowSignature but not in bob,
+ // since they need to be present in the row signature for this
window stage.
+ for (String columnName : outputColumnNames) {
+ int indexInRowSignature = rowSignature.indexOf(columnName);
+ if (indexInRowSignature != -1 && bob.build().indexOf(columnName)
== -1) {
Review Comment:
I remember stumbling upon some case, but I made a bunch of logic changes
after that so maybe it's not the case anymore. 🤔
Will try to tinker around and see if I can come up with it again. Will try
to revert back on this by Monday.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]