This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f56b57c7e9 MSQ WF: Pass a flag from broker to determine operator 
chain transformation (#17443)
3f56b57c7e9 is described below

commit 3f56b57c7e9ecff64531d8c2fc28e108feca67bf
Author: Akshat Jain <[email protected]>
AuthorDate: Tue Nov 12 09:28:28 2024 +0530

    MSQ WF: Pass a flag from broker to determine operator chain transformation 
(#17443)
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 12 ++++--
 .../WindowOperatorQueryFrameProcessor.java         | 38 +----------------
 .../druid/msq/querykit/WindowOperatorQueryKit.java | 47 +++++++++++++++++++++-
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  3 ++
 .../druid/msq/util/MultiStageQueryContext.java     | 11 +++++
 .../org/apache/druid/msq/test/MSQTestBase.java     |  1 +
 .../msq1.iq                                        |  6 ++-
 7 files changed, 74 insertions(+), 44 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 0615ba802bf..5c491c0780d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -571,7 +571,7 @@ public class ControllerImpl implements Controller
 
     final QueryContext queryContext = querySpec.getQuery().context();
     final QueryDefinition queryDef = makeQueryDefinition(
-        context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, 
querySpec, queryKernelConfig),
+        context.makeQueryKitSpec(makeQueryControllerToolKit(queryContext), 
queryId, querySpec, queryKernelConfig),
         querySpec,
         context,
         resultsContext
@@ -1211,13 +1211,19 @@ public class ControllerImpl implements Controller
   }
 
   @SuppressWarnings("rawtypes")
-  private QueryKit<Query<?>> makeQueryControllerToolKit()
+  private QueryKit<Query<?>> makeQueryControllerToolKit(QueryContext 
queryContext)
   {
     final Map<Class<? extends Query>, QueryKit> kitMap =
         ImmutableMap.<Class<? extends Query>, QueryKit>builder()
                     .put(ScanQuery.class, new 
ScanQueryKit(context.jsonMapper()))
                     .put(GroupByQuery.class, new 
GroupByQueryKit(context.jsonMapper()))
-                    .put(WindowOperatorQuery.class, new 
WindowOperatorQueryKit(context.jsonMapper()))
+                    .put(
+                        WindowOperatorQuery.class,
+                        new WindowOperatorQueryKit(
+                            context.jsonMapper(),
+                            
MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext)
+                        )
+                    )
                     .build();
 
     return new MultiQueryKit(kitMap);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 41c1df884f9..674e3ea4602 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -40,13 +40,9 @@ import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
-import org.apache.druid.query.operator.AbstractSortOperatorFactory;
-import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
 import org.apache.druid.query.operator.OffsetLimit;
 import org.apache.druid.query.operator.Operator;
 import org.apache.druid.query.operator.OperatorFactory;
-import org.apache.druid.query.operator.PartitionSortOperatorFactory;
 import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
 import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
@@ -101,7 +97,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     this.frameWriterFactory = frameWriterFactory;
     this.resultRowAndCols = new ArrayList<>();
     this.maxRowsMaterialized = 
MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
-    this.operatorFactoryList = 
getOperatorFactoryListForStageDefinition(operatorFactoryList);
+    this.operatorFactoryList = operatorFactoryList;
     this.frameRowsAndColsBuilder = new 
RowsAndColumnsBuilder(this.maxRowsMaterialized);
 
     this.frameReader = frameReader;
@@ -403,36 +399,4 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     resultRowAndCols.clear();
     rowId.set(0);
   }
-
-  /**
-   * This method converts the operator chain received from native plan into 
MSQ plan.
-   * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> 
WindowOperator) is converted into (GlueingPartitioningOperator -> 
PartitionSortOperator -> WindowOperator).
-   * We rely on MSQ's shuffling to do the clustering on partitioning keys for 
us at every stage.
-   * This conversion allows us to blindly read N rows from input channel and 
push them into the operator chain, and repeat until the input channel isn't 
finished.
-   * @param operatorFactoryListFromQuery
-   * @return
-   */
-  private List<OperatorFactory> 
getOperatorFactoryListForStageDefinition(List<OperatorFactory> 
operatorFactoryListFromQuery)
-  {
-    final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
-    final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
-    for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
-      if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
-        AbstractPartitioningOperatorFactory partition = 
(AbstractPartitioningOperatorFactory) operatorFactory;
-        operatorFactoryList.add(new 
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), 
this.maxRowsMaterialized));
-      } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
-        AbstractSortOperatorFactory sortOperatorFactory = 
(AbstractSortOperatorFactory) operatorFactory;
-        sortOperatorFactoryList.add(new 
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
-      } else {
-        // Add all the PartitionSortOperator(s) before every window operator.
-        operatorFactoryList.addAll(sortOperatorFactoryList);
-        sortOperatorFactoryList.clear();
-        operatorFactoryList.add(operatorFactory);
-      }
-    }
-
-    operatorFactoryList.addAll(sortOperatorFactoryList);
-    sortOperatorFactoryList.clear();
-    return operatorFactoryList;
-  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index b168860ff4c..a46f62866a1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -37,7 +37,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
 import org.apache.druid.query.operator.AbstractSortOperatorFactory;
 import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
 import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.PartitionSortOperatorFactory;
 import org.apache.druid.query.operator.WindowOperatorQuery;
 import org.apache.druid.query.operator.window.WindowOperatorFactory;
 import org.apache.druid.segment.column.ColumnType;
@@ -54,10 +56,12 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
 {
   private static final Logger log = new Logger(WindowOperatorQueryKit.class);
   private final ObjectMapper jsonMapper;
+  private final boolean isOperatorTransformationEnabled;
 
-  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper, boolean 
isOperatorTransformationEnabled)
   {
     this.jsonMapper = jsonMapper;
+    this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
   }
 
   @Override
@@ -172,6 +176,9 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
                                                             .flatMap(of -> 
of.getPartitionColumns().stream())
                                                             
.collect(Collectors.toList());
 
+      final List<OperatorFactory> operatorFactories = 
isOperatorTransformationEnabled
+                                                      ? 
getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), 
maxRowsMaterialized)
+                                                      : operatorList.get(i);
 
       queryDefBuilder.add(
           StageDefinition.builder(firstStageNumber + i)
@@ -181,7 +188,7 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
                          .shuffleSpec(nextShuffleSpec)
                          .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
                              queryToRun,
-                             operatorList.get(i),
+                             operatorFactories,
                              stageRowSignature,
                              maxRowsMaterialized,
                              partitionColumnNames
@@ -325,4 +332,40 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         finalWindowClusterBy.getColumns()
     );
   }
+
+  /**
+   * This method converts the operator chain received from native plan into 
MSQ plan.
+   * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> 
WindowOperator) is converted into (GlueingPartitioningOperator -> 
PartitionSortOperator -> WindowOperator).
+   * We rely on MSQ's shuffling to do the clustering on partitioning keys for 
us at every stage.
+   * This conversion allows us to blindly read N rows from input channel and 
push them into the operator chain, and repeat until the input channel isn't 
finished.
+   * @param operatorFactoryListFromQuery
+   * @param maxRowsMaterializedInWindow
+   * @return
+   */
+  private List<OperatorFactory> 
getTransformedOperatorFactoryListForStageDefinition(
+      List<OperatorFactory> operatorFactoryListFromQuery,
+      int maxRowsMaterializedInWindow
+  )
+  {
+    final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
+    final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
+    for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
+      if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
+        AbstractPartitioningOperatorFactory partition = 
(AbstractPartitioningOperatorFactory) operatorFactory;
+        operatorFactoryList.add(new 
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), 
maxRowsMaterializedInWindow));
+      } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
+        AbstractSortOperatorFactory sortOperatorFactory = 
(AbstractSortOperatorFactory) operatorFactory;
+        sortOperatorFactoryList.add(new 
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
+      } else {
+        // Add all the PartitionSortOperator(s) before every window operator.
+        operatorFactoryList.addAll(sortOperatorFactoryList);
+        sortOperatorFactoryList.clear();
+        operatorFactoryList.add(operatorFactory);
+      }
+    }
+
+    operatorFactoryList.addAll(sortOperatorFactoryList);
+    sortOperatorFactoryList.clear();
+    return operatorFactoryList;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 7cf8201c525..5462b991737 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -289,6 +289,9 @@ public class MSQTaskQueryMaker implements QueryMaker
     // Add appropriate finalization to native query context.
     nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, 
finalizeAggregations);
 
+    // This flag is to ensure backward compatibility, as brokers are upgraded 
after indexers/middlemanagers.
+    
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
 true);
+
     final MSQSpec querySpec =
         MSQSpec.builder()
                
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9059a81ffe3..7112a101c04 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -191,6 +191,9 @@ public class MultiStageQueryContext
 
   public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = 
"maxRowsMaterializedInWindow";
 
+  // This flag ensures backward compatibility and will be removed in Druid 33, 
with the default behavior as enabled.
+  public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION = 
"windowFunctionOperatorTransformation";
+
   public static final String CTX_SKIP_TYPE_VERIFICATION = 
"skipTypeVerification";
 
   /**
@@ -217,6 +220,14 @@ public class MultiStageQueryContext
     );
   }
 
+  public static boolean isWindowFunctionOperatorTransformationEnabled(final 
QueryContext queryContext)
+  {
+    return queryContext.getBoolean(
+        WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
+        false
+    );
+  }
+
   public static int getMaxConcurrentStagesWithDefault(
       final QueryContext queryContext,
       final int defaultMaxConcurrentStages
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 62496abacf6..e99b571b807 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
                   .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
                   .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
                   .put(MSQTaskQueryMaker.USER_KEY, "allowAll")
+                  
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
                   .build();
 
   public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
diff --git 
a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
 
b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
index 502885fb3ae..3a2208d0361 100644
--- 
a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
+++ 
b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
@@ -95,7 +95,8 @@ order by 1;
           "maxParseExceptions" : 0,
           "plannerStrategy" : "DECOUPLED",
           "sqlQueryId" : __SQL_QUERY_ID__
-          "sqlStringifyArrays" : false
+          "sqlStringifyArrays" : false,
+          "windowFunctionOperatorTransformation" : true
         }
       }
     },
@@ -201,7 +202,8 @@ order by 1;
           "maxParseExceptions" : 0,
           "plannerStrategy" : "DECOUPLED",
           "sqlQueryId" : __SQL_QUERY_ID__
-          "sqlStringifyArrays" : false
+          "sqlStringifyArrays" : false,
+          "windowFunctionOperatorTransformation" : true
         }
       }
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to