Akshat-Jain commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1677272488


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -125,28 +139,53 @@ public QueryDefinition makeQueryDefinition(
                              queryToRun.getOperators(),
                              rowSignature,
                              true,
-                             maxRowsMaterialized
+                             maxRowsMaterialized,
+                             new ArrayList<>()
                          ))
       );
     } else {
-      // there are multiple windows present in the query
-      // Create stages for each window in the query
-      // These stages will be serialized
-      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      // There are multiple windows present in the query.
+      // Create stages for each window in the query.
+      // These stages will be serialized.
+      // The partition by clause of the next window will be the shuffle key 
for the previous window.
       RowSignature.Builder bob = RowSignature.builder();
-      final int numberOfWindows = operatorList.size();
-      final int baseSize = rowSignature.size() - numberOfWindows;
-      for (int i = 0; i < baseSize; i++) {
-        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      RowSignature signatureFromInput = 
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+      log.info("Row signature received from last stage is [%s].", 
signatureFromInput);
+
+      for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+        bob.add(signatureFromInput.getColumnName(i), 
signatureFromInput.getColumnType(i).get());
       }
 
-      for (int i = 0; i < numberOfWindows; i++) {
-        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+      List<String> partitionColumnNames = new ArrayList<>();
+
+      /*
+      operatorList is a List<List<OperatorFactory>>, where each 
List<OperatorFactory> corresponds to the operator factories
+       to be used for a different window stage.
+
+       We iterate over operatorList, and add the definition for a window stage 
to QueryDefinitionBuilder.
+       */
+      for (int i = 0; i < operatorList.size(); i++) {
+        for (OperatorFactory operatorFactory : operatorList.get(i)) {
+          if (operatorFactory instanceof WindowOperatorFactory) {
+            List<String> outputColumnNames = ((WindowOperatorFactory) 
operatorFactory).getProcessor().getOutputColumnNames();
+
+            // Need to add column names which are present in outputColumnNames 
and rowSignature but not in bob,
+            // since they need to be present in the row signature for this 
window stage.
+            for (String columnName : outputColumnNames) {
+              int indexInRowSignature = rowSignature.indexOf(columnName);
+              if (indexInRowSignature != -1 && bob.build().indexOf(columnName) 
== -1) {

Review Comment:
   @sreemanamala I wasn't able to simulate such a case, thanks for pointing it 
out!
   I've added a `throw exception` in the `else` part of this logic for better 
debuggability if we ever run into it. Hope that works!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to