This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c571e6905d0 Refactor WindowOperatorQueryKit to use WindowStage class 
for representing different window stages (#17158)
c571e6905d0 is described below

commit c571e6905d035730e8158638202b7c6dc218effb
Author: Akshat Jain <[email protected]>
AuthorDate: Tue Nov 12 14:18:16 2024 +0530

    Refactor WindowOperatorQueryKit to use WindowStage class for representing 
different window stages (#17158)
---
 .../druid/msq/querykit/WindowOperatorQueryKit.java | 518 ++++++++++++---------
 1 file changed, 300 insertions(+), 218 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index a46f62866a1..ca9cacef7bf 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -33,6 +33,7 @@ import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
 import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
 import org.apache.druid.query.operator.AbstractSortOperatorFactory;
@@ -50,7 +51,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
 {
@@ -72,12 +72,6 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
       int minStageNumber
   )
   {
-    RowSignature rowSignature = originalQuery.getRowSignature();
-    log.info("Row signature received for query is [%s].", rowSignature);
-
-    List<List<OperatorFactory>> operatorList = 
getOperatorListFromQuery(originalQuery);
-    log.info("Created operatorList with operator factories: [%s]", 
operatorList);
-
     final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
         queryKitSpec,
         originalQuery.context(),
@@ -88,199 +82,348 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         minStageNumber,
         false
     );
-
-    ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(
-        operatorList.get(0),
-        queryKitSpec.getNumPartitionsForShuffle()
+    final RowSignature signatureFromInput = 
dataSourcePlan.getSubQueryDefBuilder()
+                                                          .get()
+                                                          .build()
+                                                          
.getFinalStageDefinition()
+                                                          .getSignature();
+
+    final WindowStages windowStages = new WindowStages(
+        originalQuery,
+        jsonMapper,
+        queryKitSpec.getNumPartitionsForShuffle(),
+        queryKitSpec.getMaxNonLeafWorkerCount(),
+        resultShuffleSpecFactory,
+        signatureFromInput,
+        isOperatorTransformationEnabled
     );
-    final QueryDefinitionBuilder queryDefBuilder =
-        makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, 
nextShuffleSpec);
-
-    final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
-    final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
-
-    // Get segment granularity from query context, and create ShuffleSpec and 
RowSignature to be used for the final window stage.
-    final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
-    final ClusterBy finalWindowClusterBy = 
computeClusterByForFinalWindowStage(segmentGranularity);
-    final ShuffleSpec finalWindowStageShuffleSpec = 
resultShuffleSpecFactory.build(finalWindowClusterBy, false);
-    final RowSignature finalWindowStageRowSignature = 
computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, 
segmentGranularity);
-
-    final int maxRowsMaterialized = 
MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context());
-
-    // There are multiple windows present in the query.
-    // Create stages for each window in the query.
-    // These stages will be serialized.
-    // The partition by clause of the next window will be the shuffle key for 
the previous window.
-    RowSignature.Builder bob = RowSignature.builder();
-    RowSignature signatureFromInput = 
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+
+    final ShuffleSpec nextShuffleSpec = 
windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle());
+    final QueryDefinitionBuilder queryDefBuilder = 
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, 
nextShuffleSpec);
+    final int firstWindowStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
+
     log.info("Row signature received from last stage is [%s].", 
signatureFromInput);
 
-    for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
-      bob.add(signatureFromInput.getColumnName(i), 
signatureFromInput.getColumnType(i).get());
+    // Iterate over the list of window stages, and add the definition for each 
window stage to QueryDefinitionBuilder.
+    for (int i = 0; i < windowStages.getStages().size(); i++) {
+      
queryDefBuilder.add(windowStages.getStageDefinitionBuilder(firstWindowStageNumber
 + i, i));
+    }
+    return queryDefBuilder.build();
+  }
+
+  /**
+   * Represents the window stages to be added to {@link 
QueryDefinitionBuilder}.
+   * This class is responsible for creating the window stages.
+   */
+  private static class WindowStages
+  {
+    private final List<WindowStage> stages;
+    private final WindowOperatorQuery query;
+    private final int numPartitionsForShuffle;
+    private final int maxNonLeafWorkerCount;
+    private final ShuffleSpec finalWindowStageShuffleSpec;
+    private final RowSignature finalWindowStageRowSignature;
+    private final RowSignature.Builder rowSignatureBuilder;
+    private final boolean isOperatorTransformationEnabled;
+
+    private WindowStages(
+        WindowOperatorQuery query,
+        ObjectMapper jsonMapper,
+        int numPartitionsForShuffle,
+        int maxNonLeafWorkerCount,
+        ShuffleSpecFactory resultShuffleSpecFactory,
+        RowSignature signatureFromInput,
+        boolean isOperatorTransformationEnabled
+    )
+    {
+      this.stages = new ArrayList<>();
+      this.query = query;
+      this.numPartitionsForShuffle = numPartitionsForShuffle;
+      this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
+      this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
+
+      final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(
+          jsonMapper,
+          query.getContext()
+      );
+      final ClusterBy finalWindowClusterBy = 
computeClusterByForFinalWindowStage(segmentGranularity);
+      this.finalWindowStageShuffleSpec = computeShuffleSpecForFinalWindowStage(
+          resultShuffleSpecFactory,
+          finalWindowClusterBy
+      );
+      this.finalWindowStageRowSignature = computeSignatureForFinalWindowStage(
+          query.getRowSignature(),
+          finalWindowClusterBy,
+          segmentGranularity
+      );
+
+      this.rowSignatureBuilder = 
RowSignature.builder().addAll(signatureFromInput);
+      populateStages();
+    }
+
+    private void populateStages()
+    {
+      WindowStage currentStage = new WindowStage(getMaxRowsMaterialized());
+      for (OperatorFactory of : query.getOperators()) {
+        if (!currentStage.canAccept(of)) {
+          stages.add(currentStage);
+          currentStage = new WindowStage(getMaxRowsMaterialized());
+        }
+        currentStage.addOperatorFactory(of);
+      }
+      if (!currentStage.getOperatorFactories().isEmpty()) {
+        stages.add(currentStage);
+      }
+
+      log.info("Created window stages: [%s]", stages);
     }
 
-    /*
-    operatorList is a List<List<OperatorFactory>>, where each 
List<OperatorFactory> corresponds to the operator factories
-     to be used for a different window stage.
+    private List<WindowStage> getStages()
+    {
+      return stages;
+    }
 
-     We iterate over operatorList, and add the definition for a window stage 
to QueryDefinitionBuilder.
-     */
-    for (int i = 0; i < operatorList.size(); i++) {
-      for (OperatorFactory operatorFactory : operatorList.get(i)) {
-        if (operatorFactory instanceof WindowOperatorFactory) {
-          List<String> outputColumnNames = ((WindowOperatorFactory) 
operatorFactory).getProcessor().getOutputColumnNames();
-
-          // Need to add column names which are present in outputColumnNames 
and rowSignature but not in bob,
-          // since they need to be present in the row signature for this 
window stage.
-          for (String columnName : outputColumnNames) {
-            int indexInRowSignature = rowSignature.indexOf(columnName);
-            if (indexInRowSignature != -1 && bob.build().indexOf(columnName) 
== -1) {
-              ColumnType columnType = 
rowSignature.getColumnType(indexInRowSignature).get();
-              bob.add(columnName, columnType);
-              log.info("Added column [%s] of type [%s] to row signature for 
window stage.", columnName, columnType);
-            } else {
-              throw new ISE(
-                  "Found unexpected column [%s] already present in row 
signature [%s].",
-                  columnName,
-                  rowSignature
-              );
-            }
+    private RowSignature getRowSignatureForStage(int windowStageIndex, 
ShuffleSpec shuffleSpec)
+    {
+      if (windowStageIndex == stages.size() - 1) {
+        return finalWindowStageRowSignature;
+      }
+
+      final WindowStage stage = stages.get(windowStageIndex);
+      for (WindowOperatorFactory operatorFactory : 
stage.getWindowOperatorFactories()) {
+        for (String columnName : 
operatorFactory.getProcessor().getOutputColumnNames()) {
+          int indexInRowSignature = 
query.getRowSignature().indexOf(columnName);
+          if (indexInRowSignature != -1 && 
rowSignatureBuilder.build().indexOf(columnName) == -1) {
+            ColumnType columnType = 
query.getRowSignature().getColumnType(indexInRowSignature).get();
+            rowSignatureBuilder.add(columnName, columnType);
           }
         }
       }
 
-      final RowSignature intermediateSignature = bob.build();
-      final RowSignature stageRowSignature;
+      final RowSignature intermediateSignature = rowSignatureBuilder.build();
 
-      if (i + 1 == operatorList.size()) {
-        stageRowSignature = finalWindowStageRowSignature;
-        nextShuffleSpec = finalWindowStageShuffleSpec;
+      final RowSignature stageRowSignature;
+      if (shuffleSpec == null) {
+        stageRowSignature = intermediateSignature;
       } else {
-        nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 
1), queryKitSpec.getNumPartitionsForShuffle());
-        if (nextShuffleSpec == null) {
-          stageRowSignature = intermediateSignature;
-        } else {
-          stageRowSignature = QueryKitUtils.sortableSignature(
-              intermediateSignature,
-              nextShuffleSpec.clusterBy().getColumns()
-          );
-        }
+        stageRowSignature = QueryKitUtils.sortableSignature(
+            intermediateSignature,
+            shuffleSpec.clusterBy().getColumns()
+        );
       }
 
       log.info("Using row signature [%s] for window stage.", 
stageRowSignature);
+      return stageRowSignature;
+    }
 
-      final List<String> partitionColumnNames = operatorList.get(i)
-                                                            .stream()
-                                                            .filter(of -> of 
instanceof AbstractPartitioningOperatorFactory)
-                                                            .map(of -> 
(AbstractPartitioningOperatorFactory) of)
-                                                            .flatMap(of -> 
of.getPartitionColumns().stream())
-                                                            
.collect(Collectors.toList());
+    private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, 
int windowStageIndex)
+    {
+      final WindowStage stage = stages.get(windowStageIndex);
+      final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ?
+                                      finalWindowStageShuffleSpec :
+                                      stages.get(windowStageIndex + 
1).findShuffleSpec(numPartitionsForShuffle);
 
+      final RowSignature stageRowSignature = 
getRowSignatureForStage(windowStageIndex, shuffleSpec);
       final List<OperatorFactory> operatorFactories = 
isOperatorTransformationEnabled
-                                                      ? 
getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), 
maxRowsMaterialized)
-                                                      : operatorList.get(i);
-
-      queryDefBuilder.add(
-          StageDefinition.builder(firstStageNumber + i)
-                         .inputs(new StageInputSpec(firstStageNumber + i - 1))
-                         .signature(stageRowSignature)
-                         
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
-                         .shuffleSpec(nextShuffleSpec)
-                         .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
-                             queryToRun,
-                             operatorFactories,
-                             stageRowSignature,
-                             maxRowsMaterialized,
-                             partitionColumnNames
-                         ))
+                                                      ? 
stage.getTransformedOperatorFactories()
+                                                      : 
stage.getOperatorFactories();
+
+      return StageDefinition.builder(stageNumber)
+                            .inputs(new StageInputSpec(stageNumber - 1))
+                            .signature(stageRowSignature)
+                            .maxWorkerCount(maxNonLeafWorkerCount)
+                            .shuffleSpec(shuffleSpec)
+                            .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                                query,
+                                operatorFactories,
+                                stageRowSignature,
+                                getMaxRowsMaterialized(),
+                                stage.getPartitionColumns()
+                            ));
+    }
+
+    /**
+     * Computes the ClusterBy for the final window stage. We don't have to 
take the CLUSTERED BY columns into account,
+     * as they are handled as {@link 
org.apache.druid.query.scan.ScanQuery#orderBys}.
+     */
+    private ClusterBy computeClusterByForFinalWindowStage(Granularity 
segmentGranularity)
+    {
+      final List<KeyColumn> clusterByColumns = Collections.singletonList(new 
KeyColumn(
+          QueryKitUtils.PARTITION_BOOST_COLUMN,
+          KeyOrder.ASCENDING
+      ));
+      return QueryKitUtils.clusterByWithSegmentGranularity(new 
ClusterBy(clusterByColumns, 0), segmentGranularity);
+    }
+
+    /**
+     * Computes the signature for the final window stage. The 
finalWindowClusterBy will always have the
+     * partition boost column as computed in {@link 
#computeClusterByForFinalWindowStage(Granularity)}.
+     */
+    private RowSignature computeSignatureForFinalWindowStage(
+        RowSignature rowSignature,
+        ClusterBy finalWindowClusterBy,
+        Granularity segmentGranularity
+    )
+    {
+      final RowSignature.Builder finalWindowStageRowSignatureBuilder = 
RowSignature.builder()
+                                                                               
    .addAll(rowSignature)
+                                                                               
    .add(
+                                                                               
        QueryKitUtils.PARTITION_BOOST_COLUMN,
+                                                                               
        ColumnType.LONG
+                                                                               
    );
+      return QueryKitUtils.sortableSignature(
+          QueryKitUtils.signatureWithSegmentGranularity(
+              finalWindowStageRowSignatureBuilder.build(),
+              segmentGranularity
+          ),
+          finalWindowClusterBy.getColumns()
       );
     }
 
-    return queryDefBuilder.build();
+    private ShuffleSpec computeShuffleSpecForFinalWindowStage(
+        ShuffleSpecFactory resultShuffleSpecFactory,
+        ClusterBy finalWindowClusterBy
+    )
+    {
+      return resultShuffleSpecFactory.build(finalWindowClusterBy, false);
+    }
+
+    private int getMaxRowsMaterialized()
+    {
+      return 
MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
+    }
   }
 
   /**
-   *
-   * @param originalQuery
-   * @return A list of list of operator factories, where each list represents 
the operator factories for a particular
-   * window stage.
+   * Represents a window stage in a query execution.
+   * Each stage can contain a sort operator, a partition operator, and 
multiple window operators.
    */
-  private List<List<OperatorFactory>> 
getOperatorListFromQuery(WindowOperatorQuery originalQuery)
+  private static class WindowStage
   {
-    List<List<OperatorFactory>> operatorList = new ArrayList<>();
-    final List<OperatorFactory> operators = originalQuery.getOperators();
-    List<OperatorFactory> currentStage = new ArrayList<>();
-
-    for (int i = 0; i < operators.size(); i++) {
-      OperatorFactory of = operators.get(i);
-      currentStage.add(of);
-
-      if (of instanceof WindowOperatorFactory) {
-        // Process consecutive window operators
-        while (i + 1 < operators.size() && operators.get(i + 1) instanceof 
WindowOperatorFactory) {
-          i++;
-          currentStage.add(operators.get(i));
-        }
-
-        // Finalize the current stage
-        operatorList.add(new ArrayList<>(currentStage));
-        currentStage.clear();
-      }
+    private AbstractSortOperatorFactory sortOperatorFactory;
+    private AbstractPartitioningOperatorFactory partitioningOperatorFactory;
+    private final List<WindowOperatorFactory> windowOperatorFactories;
+    private final int maxRowsMaterialized;
+
+    private WindowStage(int maxRowsMaterialized)
+    {
+      this.windowOperatorFactories = new ArrayList<>();
+      this.maxRowsMaterialized = maxRowsMaterialized;
     }
 
-    // There shouldn't be any operators left in currentStage. The last 
operator should always be WindowOperatorFactory.
-    if (!currentStage.isEmpty()) {
-      throw new ISE(
-          "Found unexpected operators [%s] present in the list of operators 
[%s].",
-          currentStage,
-          operators
-      );
+    private void addOperatorFactory(OperatorFactory op)
+    {
+      if (op instanceof AbstractSortOperatorFactory) {
+        this.sortOperatorFactory = (AbstractSortOperatorFactory) op;
+      } else if (op instanceof AbstractPartitioningOperatorFactory) {
+        this.partitioningOperatorFactory = 
(AbstractPartitioningOperatorFactory) op;
+      } else {
+        this.windowOperatorFactories.add((WindowOperatorFactory) op);
+      }
     }
 
-    return operatorList;
-  }
-
-  private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> 
operatorFactories, int partitionCount)
-  {
-    AbstractPartitioningOperatorFactory partition = null;
-    AbstractSortOperatorFactory sort = null;
-    for (OperatorFactory of : operatorFactories) {
-      if (of instanceof AbstractPartitioningOperatorFactory) {
-        partition = (AbstractPartitioningOperatorFactory) of;
-      } else if (of instanceof AbstractSortOperatorFactory) {
-        sort = (AbstractSortOperatorFactory) of;
+    private List<OperatorFactory> getOperatorFactories()
+    {
+      List<OperatorFactory> operatorFactories = new ArrayList<>();
+      if (sortOperatorFactory != null) {
+        operatorFactories.add(sortOperatorFactory);
       }
+      if (partitioningOperatorFactory != null) {
+        operatorFactories.add(partitioningOperatorFactory);
+      }
+      operatorFactories.addAll(windowOperatorFactories);
+      return operatorFactories;
     }
 
-    Map<String, ColumnWithDirection.Direction> sortColumnsMap = new 
HashMap<>();
-    if (sort != null) {
-      for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
-        sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+    /**
+     * This method converts the operator chain received from native plan into 
MSQ plan.
+     * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> 
WindowOperator) is converted into (GlueingPartitioningOperator -> 
PartitionSortOperator -> WindowOperator).
+     * We rely on MSQ's shuffling to do the clustering on partitioning keys 
for us at every stage.
+     * This conversion allows us to blindly read N rows from input channel and 
push them into the operator chain, and repeat until the input channel isn't 
finished.
+     * @return
+     */
+    private List<OperatorFactory> getTransformedOperatorFactories()
+    {
+      List<OperatorFactory> operatorFactories = new ArrayList<>();
+      if (partitioningOperatorFactory != null) {
+        operatorFactories.add(new 
GlueingPartitioningOperatorFactory(partitioningOperatorFactory.getPartitionColumns(),
 maxRowsMaterialized));
+      }
+      if (sortOperatorFactory != null) {
+        operatorFactories.add(new 
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
       }
+      operatorFactories.addAll(windowOperatorFactories);
+      return operatorFactories;
     }
 
-    if (partition == null) {
-      // If operatorFactories doesn't have any partitioning factory, then we 
should keep the shuffle spec from previous stage.
-      // This indicates that we already have the data partitioned correctly, 
and hence we don't need to do any shuffling.
-      return null;
+    private List<WindowOperatorFactory> getWindowOperatorFactories()
+    {
+      return windowOperatorFactories;
     }
 
-    if (partition.getPartitionColumns().isEmpty()) {
-      return MixShuffleSpec.instance();
+    private ShuffleSpec findShuffleSpec(int partitionCount)
+    {
+      Map<String, ColumnWithDirection.Direction> sortColumnsMap = new 
HashMap<>();
+      if (sortOperatorFactory != null) {
+        for (ColumnWithDirection sortColumn : 
sortOperatorFactory.getSortColumns()) {
+          sortColumnsMap.put(sortColumn.getColumn(), 
sortColumn.getDirection());
+        }
+      }
+
+      if (partitioningOperatorFactory == null) {
+        // If the window stage doesn't have any partitioning factory, then we 
should keep the shuffle spec from previous stage.
+        // This indicates that we already have the data partitioned correctly, 
and hence we don't need to do any shuffling.
+        return null;
+      }
+
+      if (partitioningOperatorFactory.getPartitionColumns().isEmpty()) {
+        return MixShuffleSpec.instance();
+      }
+
+      final List<KeyColumn> keyColsOfWindow = new ArrayList<>();
+      for (String partitionColumn : 
partitioningOperatorFactory.getPartitionColumns()) {
+        KeyColumn kc = new KeyColumn(
+            partitionColumn,
+            sortColumnsMap.get(partitionColumn) == 
ColumnWithDirection.Direction.DESC
+            ? KeyOrder.DESCENDING
+            : KeyOrder.ASCENDING
+        );
+        keyColsOfWindow.add(kc);
+      }
+
+      return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
partitionCount);
     }
 
-    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
-    for (String partitionColumn : partition.getPartitionColumns()) {
-      KeyColumn kc;
-      if (sortColumnsMap.get(partitionColumn) == 
ColumnWithDirection.Direction.DESC) {
-        kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
-      } else {
-        kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+    private boolean canAccept(OperatorFactory operatorFactory)
+    {
+      if (getOperatorFactories().isEmpty()) {
+        return true;
+      }
+      if (operatorFactory instanceof AbstractSortOperatorFactory) {
+        return false;
       }
-      keyColsOfWindow.add(kc);
+      if (operatorFactory instanceof WindowOperatorFactory) {
+        return true;
+      }
+      if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
+        return sortOperatorFactory != null;
+      }
+      throw new ISE("Encountered unexpected operatorFactory type: [%s]", 
operatorFactory.getClass().getName());
     }
 
-    return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
partitionCount);
+    private List<String> getPartitionColumns()
+    {
+      return partitioningOperatorFactory == null ? new ArrayList<>() : 
partitioningOperatorFactory.getPartitionColumns();
+    }
+
+    @Override
+    public String toString()
+    {
+      return "WindowStage{" +
+             "sortOperatorFactory=" + sortOperatorFactory +
+             ", partitioningOperatorFactory=" + partitioningOperatorFactory +
+             ", windowOperatorFactories=" + windowOperatorFactories +
+             '}';
+    }
   }
 
   /**
@@ -307,65 +450,4 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
     }
     return queryDefBuilder;
   }
-
-  /**
-   * Computes the ClusterBy for the final window stage. We don't have to take 
the CLUSTERED BY columns into account,
-   * as they are handled as {@link 
org.apache.druid.query.scan.ScanQuery#orderBys}.
-   */
-  private static ClusterBy computeClusterByForFinalWindowStage(Granularity 
segmentGranularity)
-  {
-    final List<KeyColumn> clusterByColumns = Collections.singletonList(new 
KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
-    return QueryKitUtils.clusterByWithSegmentGranularity(new 
ClusterBy(clusterByColumns, 0), segmentGranularity);
-  }
-
-  /**
-   * Computes the signature for the final window stage. The 
finalWindowClusterBy will always have the
-   * partition boost column as computed in {@link 
#computeClusterByForFinalWindowStage(Granularity)}.
-   */
-  private static RowSignature computeSignatureForFinalWindowStage(RowSignature 
rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity)
-  {
-    final RowSignature.Builder finalWindowStageRowSignatureBuilder = 
RowSignature.builder()
-                                                                               
  .addAll(rowSignature)
-                                                                               
  .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
-    return QueryKitUtils.sortableSignature(
-        
QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(),
 segmentGranularity),
-        finalWindowClusterBy.getColumns()
-    );
-  }
-
-  /**
-   * This method converts the operator chain received from native plan into 
MSQ plan.
-   * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> 
WindowOperator) is converted into (GlueingPartitioningOperator -> 
PartitionSortOperator -> WindowOperator).
-   * We rely on MSQ's shuffling to do the clustering on partitioning keys for 
us at every stage.
-   * This conversion allows us to blindly read N rows from input channel and 
push them into the operator chain, and repeat until the input channel isn't 
finished.
-   * @param operatorFactoryListFromQuery
-   * @param maxRowsMaterializedInWindow
-   * @return
-   */
-  private List<OperatorFactory> 
getTransformedOperatorFactoryListForStageDefinition(
-      List<OperatorFactory> operatorFactoryListFromQuery,
-      int maxRowsMaterializedInWindow
-  )
-  {
-    final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
-    final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
-    for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
-      if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
-        AbstractPartitioningOperatorFactory partition = 
(AbstractPartitioningOperatorFactory) operatorFactory;
-        operatorFactoryList.add(new 
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), 
maxRowsMaterializedInWindow));
-      } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
-        AbstractSortOperatorFactory sortOperatorFactory = 
(AbstractSortOperatorFactory) operatorFactory;
-        sortOperatorFactoryList.add(new 
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
-      } else {
-        // Add all the PartitionSortOperator(s) before every window operator.
-        operatorFactoryList.addAll(sortOperatorFactoryList);
-        sortOperatorFactoryList.clear();
-        operatorFactoryList.add(operatorFactory);
-      }
-    }
-
-    operatorFactoryList.addAll(sortOperatorFactoryList);
-    sortOperatorFactoryList.clear();
-    return operatorFactoryList;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to