Akshat-Jain commented on code in PR #16729:
URL: https://github.com/apache/druid/pull/16729#discussion_r1678825723


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -125,28 +139,53 @@ public QueryDefinition makeQueryDefinition(
                              queryToRun.getOperators(),
                              rowSignature,
                              true,
-                             maxRowsMaterialized
+                             maxRowsMaterialized,
+                             new ArrayList<>()
                          ))
       );
     } else {
-      // there are multiple windows present in the query
-      // Create stages for each window in the query
-      // These stages will be serialized
-      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      // There are multiple windows present in the query.
+      // Create stages for each window in the query.
+      // These stages will be serialized.
+      // The partition by clause of the next window will be the shuffle key 
for the previous window.
       RowSignature.Builder bob = RowSignature.builder();
-      final int numberOfWindows = operatorList.size();
-      final int baseSize = rowSignature.size() - numberOfWindows;
-      for (int i = 0; i < baseSize; i++) {
-        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      RowSignature signatureFromInput = 
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+      log.info("Row signature received from last stage is [%s].", 
signatureFromInput);
+
+      for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+        bob.add(signatureFromInput.getColumnName(i), 
signatureFromInput.getColumnType(i).get());
       }
 
-      for (int i = 0; i < numberOfWindows; i++) {
-        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+      List<String> partitionColumnNames = new ArrayList<>();
+
+      /*
+      operatorList is a List<List<OperatorFactory>>, where each 
List<OperatorFactory> corresponds to the operator factories
+       to be used for a different window stage.
+
+       We iterate over operatorList, and add the definition for a window stage 
to QueryDefinitionBuilder.
+       */
+      for (int i = 0; i < operatorList.size(); i++) {
+        for (OperatorFactory operatorFactory : operatorList.get(i)) {
+          if (operatorFactory instanceof WindowOperatorFactory) {
+            List<String> outputColumnNames = ((WindowOperatorFactory) 
operatorFactory).getProcessor().getOutputColumnNames();
+
+            // Need to add column names which are present in outputColumnNames 
and rowSignature but not in bob,
+            // since they need to be present in the row signature for this 
window stage.
+            for (String columnName : outputColumnNames) {
+              int indexInRowSignature = rowSignature.indexOf(columnName);
+              if (indexInRowSignature != -1 && bob.build().indexOf(columnName) 
== -1) {

Review Comment:
   @sreemanamala 
   
   > So, we are trying to be cautious here.
   
   Yeah. We could get rid of it maybe once the entire flow has had enough of 
soak time. But I like the idea of throwing error instead of letting the query 
run through an unexpected scenario.
   
   > Instead of ISE, we could use a DruidException and the error message would 
give column value like w0. (might not be very helpful)
   
   I saw ISE being thrown in other query kits, so used that for consistency. 
Column value like w0 should be helpful since we are also logging the entire row 
signature, as well as which columns are getting added to the row signature for 
every stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to