This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c571e6905d0 Refactor WindowOperatorQueryKit to use WindowStage class
for representing different window stages (#17158)
c571e6905d0 is described below
commit c571e6905d035730e8158638202b7c6dc218effb
Author: Akshat Jain <[email protected]>
AuthorDate: Tue Nov 12 14:18:16 2024 +0530
Refactor WindowOperatorQueryKit to use WindowStage class for representing
different window stages (#17158)
---
.../druid/msq/querykit/WindowOperatorQueryKit.java | 518 ++++++++++++---------
1 file changed, 300 insertions(+), 218 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index a46f62866a1..ca9cacef7bf 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -33,6 +33,7 @@ import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
@@ -50,7 +51,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
@@ -72,12 +72,6 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
int minStageNumber
)
{
- RowSignature rowSignature = originalQuery.getRowSignature();
- log.info("Row signature received for query is [%s].", rowSignature);
-
- List<List<OperatorFactory>> operatorList =
getOperatorListFromQuery(originalQuery);
- log.info("Created operatorList with operator factories: [%s]",
operatorList);
-
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKitSpec,
originalQuery.context(),
@@ -88,199 +82,348 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
minStageNumber,
false
);
-
- ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(
- operatorList.get(0),
- queryKitSpec.getNumPartitionsForShuffle()
+ final RowSignature signatureFromInput =
dataSourcePlan.getSubQueryDefBuilder()
+ .get()
+ .build()
+
.getFinalStageDefinition()
+ .getSignature();
+
+ final WindowStages windowStages = new WindowStages(
+ originalQuery,
+ jsonMapper,
+ queryKitSpec.getNumPartitionsForShuffle(),
+ queryKitSpec.getMaxNonLeafWorkerCount(),
+ resultShuffleSpecFactory,
+ signatureFromInput,
+ isOperatorTransformationEnabled
);
- final QueryDefinitionBuilder queryDefBuilder =
- makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan,
nextShuffleSpec);
-
- final int firstStageNumber = Math.max(minStageNumber,
queryDefBuilder.getNextStageNumber());
- final WindowOperatorQuery queryToRun = (WindowOperatorQuery)
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
-
- // Get segment granularity from query context, and create ShuffleSpec and
RowSignature to be used for the final window stage.
- final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper,
queryToRun.getContext());
- final ClusterBy finalWindowClusterBy =
computeClusterByForFinalWindowStage(segmentGranularity);
- final ShuffleSpec finalWindowStageShuffleSpec =
resultShuffleSpecFactory.build(finalWindowClusterBy, false);
- final RowSignature finalWindowStageRowSignature =
computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy,
segmentGranularity);
-
- final int maxRowsMaterialized =
MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context());
-
- // There are multiple windows present in the query.
- // Create stages for each window in the query.
- // These stages will be serialized.
- // The partition by clause of the next window will be the shuffle key for
the previous window.
- RowSignature.Builder bob = RowSignature.builder();
- RowSignature signatureFromInput =
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+
+ final ShuffleSpec nextShuffleSpec =
windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle());
+ final QueryDefinitionBuilder queryDefBuilder =
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan,
nextShuffleSpec);
+ final int firstWindowStageNumber = Math.max(minStageNumber,
queryDefBuilder.getNextStageNumber());
+
log.info("Row signature received from last stage is [%s].",
signatureFromInput);
- for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
- bob.add(signatureFromInput.getColumnName(i),
signatureFromInput.getColumnType(i).get());
+ // Iterate over the list of window stages, and add the definition for each
window stage to QueryDefinitionBuilder.
+ for (int i = 0; i < windowStages.getStages().size(); i++) {
+
queryDefBuilder.add(windowStages.getStageDefinitionBuilder(firstWindowStageNumber
+ i, i));
+ }
+ return queryDefBuilder.build();
+ }
+
+ /**
+ * Represents the window stages to be added to {@link
QueryDefinitionBuilder}.
+ * This class is responsible for creating the window stages.
+ */
+ private static class WindowStages
+ {
+ private final List<WindowStage> stages;
+ private final WindowOperatorQuery query;
+ private final int numPartitionsForShuffle;
+ private final int maxNonLeafWorkerCount;
+ private final ShuffleSpec finalWindowStageShuffleSpec;
+ private final RowSignature finalWindowStageRowSignature;
+ private final RowSignature.Builder rowSignatureBuilder;
+ private final boolean isOperatorTransformationEnabled;
+
+ private WindowStages(
+ WindowOperatorQuery query,
+ ObjectMapper jsonMapper,
+ int numPartitionsForShuffle,
+ int maxNonLeafWorkerCount,
+ ShuffleSpecFactory resultShuffleSpecFactory,
+ RowSignature signatureFromInput,
+ boolean isOperatorTransformationEnabled
+ )
+ {
+ this.stages = new ArrayList<>();
+ this.query = query;
+ this.numPartitionsForShuffle = numPartitionsForShuffle;
+ this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
+ this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
+
+ final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(
+ jsonMapper,
+ query.getContext()
+ );
+ final ClusterBy finalWindowClusterBy =
computeClusterByForFinalWindowStage(segmentGranularity);
+ this.finalWindowStageShuffleSpec = computeShuffleSpecForFinalWindowStage(
+ resultShuffleSpecFactory,
+ finalWindowClusterBy
+ );
+ this.finalWindowStageRowSignature = computeSignatureForFinalWindowStage(
+ query.getRowSignature(),
+ finalWindowClusterBy,
+ segmentGranularity
+ );
+
+ this.rowSignatureBuilder =
RowSignature.builder().addAll(signatureFromInput);
+ populateStages();
+ }
+
+ private void populateStages()
+ {
+ WindowStage currentStage = new WindowStage(getMaxRowsMaterialized());
+ for (OperatorFactory of : query.getOperators()) {
+ if (!currentStage.canAccept(of)) {
+ stages.add(currentStage);
+ currentStage = new WindowStage(getMaxRowsMaterialized());
+ }
+ currentStage.addOperatorFactory(of);
+ }
+ if (!currentStage.getOperatorFactories().isEmpty()) {
+ stages.add(currentStage);
+ }
+
+ log.info("Created window stages: [%s]", stages);
}
- /*
- operatorList is a List<List<OperatorFactory>>, where each
List<OperatorFactory> corresponds to the operator factories
- to be used for a different window stage.
+ private List<WindowStage> getStages()
+ {
+ return stages;
+ }
- We iterate over operatorList, and add the definition for a window stage
to QueryDefinitionBuilder.
- */
- for (int i = 0; i < operatorList.size(); i++) {
- for (OperatorFactory operatorFactory : operatorList.get(i)) {
- if (operatorFactory instanceof WindowOperatorFactory) {
- List<String> outputColumnNames = ((WindowOperatorFactory)
operatorFactory).getProcessor().getOutputColumnNames();
-
- // Need to add column names which are present in outputColumnNames
and rowSignature but not in bob,
- // since they need to be present in the row signature for this
window stage.
- for (String columnName : outputColumnNames) {
- int indexInRowSignature = rowSignature.indexOf(columnName);
- if (indexInRowSignature != -1 && bob.build().indexOf(columnName)
== -1) {
- ColumnType columnType =
rowSignature.getColumnType(indexInRowSignature).get();
- bob.add(columnName, columnType);
- log.info("Added column [%s] of type [%s] to row signature for
window stage.", columnName, columnType);
- } else {
- throw new ISE(
- "Found unexpected column [%s] already present in row
signature [%s].",
- columnName,
- rowSignature
- );
- }
+ private RowSignature getRowSignatureForStage(int windowStageIndex,
ShuffleSpec shuffleSpec)
+ {
+ if (windowStageIndex == stages.size() - 1) {
+ return finalWindowStageRowSignature;
+ }
+
+ final WindowStage stage = stages.get(windowStageIndex);
+ for (WindowOperatorFactory operatorFactory :
stage.getWindowOperatorFactories()) {
+ for (String columnName :
operatorFactory.getProcessor().getOutputColumnNames()) {
+ int indexInRowSignature =
query.getRowSignature().indexOf(columnName);
+ if (indexInRowSignature != -1 &&
rowSignatureBuilder.build().indexOf(columnName) == -1) {
+ ColumnType columnType =
query.getRowSignature().getColumnType(indexInRowSignature).get();
+ rowSignatureBuilder.add(columnName, columnType);
}
}
}
- final RowSignature intermediateSignature = bob.build();
- final RowSignature stageRowSignature;
+ final RowSignature intermediateSignature = rowSignatureBuilder.build();
- if (i + 1 == operatorList.size()) {
- stageRowSignature = finalWindowStageRowSignature;
- nextShuffleSpec = finalWindowStageShuffleSpec;
+ final RowSignature stageRowSignature;
+ if (shuffleSpec == null) {
+ stageRowSignature = intermediateSignature;
} else {
- nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i +
1), queryKitSpec.getNumPartitionsForShuffle());
- if (nextShuffleSpec == null) {
- stageRowSignature = intermediateSignature;
- } else {
- stageRowSignature = QueryKitUtils.sortableSignature(
- intermediateSignature,
- nextShuffleSpec.clusterBy().getColumns()
- );
- }
+ stageRowSignature = QueryKitUtils.sortableSignature(
+ intermediateSignature,
+ shuffleSpec.clusterBy().getColumns()
+ );
}
log.info("Using row signature [%s] for window stage.",
stageRowSignature);
+ return stageRowSignature;
+ }
- final List<String> partitionColumnNames = operatorList.get(i)
- .stream()
- .filter(of -> of
instanceof AbstractPartitioningOperatorFactory)
- .map(of ->
(AbstractPartitioningOperatorFactory) of)
- .flatMap(of ->
of.getPartitionColumns().stream())
-
.collect(Collectors.toList());
+ private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber,
int windowStageIndex)
+ {
+ final WindowStage stage = stages.get(windowStageIndex);
+ final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ?
+ finalWindowStageShuffleSpec :
+ stages.get(windowStageIndex +
1).findShuffleSpec(numPartitionsForShuffle);
+ final RowSignature stageRowSignature =
getRowSignatureForStage(windowStageIndex, shuffleSpec);
final List<OperatorFactory> operatorFactories =
isOperatorTransformationEnabled
- ?
getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i),
maxRowsMaterialized)
- : operatorList.get(i);
-
- queryDefBuilder.add(
- StageDefinition.builder(firstStageNumber + i)
- .inputs(new StageInputSpec(firstStageNumber + i - 1))
- .signature(stageRowSignature)
-
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
- .shuffleSpec(nextShuffleSpec)
- .processorFactory(new
WindowOperatorQueryFrameProcessorFactory(
- queryToRun,
- operatorFactories,
- stageRowSignature,
- maxRowsMaterialized,
- partitionColumnNames
- ))
+ ?
stage.getTransformedOperatorFactories()
+ :
stage.getOperatorFactories();
+
+ return StageDefinition.builder(stageNumber)
+ .inputs(new StageInputSpec(stageNumber - 1))
+ .signature(stageRowSignature)
+ .maxWorkerCount(maxNonLeafWorkerCount)
+ .shuffleSpec(shuffleSpec)
+ .processorFactory(new
WindowOperatorQueryFrameProcessorFactory(
+ query,
+ operatorFactories,
+ stageRowSignature,
+ getMaxRowsMaterialized(),
+ stage.getPartitionColumns()
+ ));
+ }
+
+ /**
+ * Computes the ClusterBy for the final window stage. We don't have to
take the CLUSTERED BY columns into account,
+ * as they are handled as {@link
org.apache.druid.query.scan.ScanQuery#orderBys}.
+ */
+ private ClusterBy computeClusterByForFinalWindowStage(Granularity
segmentGranularity)
+ {
+ final List<KeyColumn> clusterByColumns = Collections.singletonList(new
KeyColumn(
+ QueryKitUtils.PARTITION_BOOST_COLUMN,
+ KeyOrder.ASCENDING
+ ));
+ return QueryKitUtils.clusterByWithSegmentGranularity(new
ClusterBy(clusterByColumns, 0), segmentGranularity);
+ }
+
+ /**
+ * Computes the signature for the final window stage. The
finalWindowClusterBy will always have the
+ * partition boost column as computed in {@link
#computeClusterByForFinalWindowStage(Granularity)}.
+ */
+ private RowSignature computeSignatureForFinalWindowStage(
+ RowSignature rowSignature,
+ ClusterBy finalWindowClusterBy,
+ Granularity segmentGranularity
+ )
+ {
+ final RowSignature.Builder finalWindowStageRowSignatureBuilder =
RowSignature.builder()
+
.addAll(rowSignature)
+
.add(
+
QueryKitUtils.PARTITION_BOOST_COLUMN,
+
ColumnType.LONG
+
);
+ return QueryKitUtils.sortableSignature(
+ QueryKitUtils.signatureWithSegmentGranularity(
+ finalWindowStageRowSignatureBuilder.build(),
+ segmentGranularity
+ ),
+ finalWindowClusterBy.getColumns()
);
}
- return queryDefBuilder.build();
+ private ShuffleSpec computeShuffleSpecForFinalWindowStage(
+ ShuffleSpecFactory resultShuffleSpecFactory,
+ ClusterBy finalWindowClusterBy
+ )
+ {
+ return resultShuffleSpecFactory.build(finalWindowClusterBy, false);
+ }
+
+ private int getMaxRowsMaterialized()
+ {
+ return
MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
+ }
}
/**
- *
- * @param originalQuery
- * @return A list of list of operator factories, where each list represents
the operator factories for a particular
- * window stage.
+ * Represents a window stage in a query execution.
+ * Each stage can contain a sort operator, a partition operator, and
multiple window operators.
*/
- private List<List<OperatorFactory>>
getOperatorListFromQuery(WindowOperatorQuery originalQuery)
+ private static class WindowStage
{
- List<List<OperatorFactory>> operatorList = new ArrayList<>();
- final List<OperatorFactory> operators = originalQuery.getOperators();
- List<OperatorFactory> currentStage = new ArrayList<>();
-
- for (int i = 0; i < operators.size(); i++) {
- OperatorFactory of = operators.get(i);
- currentStage.add(of);
-
- if (of instanceof WindowOperatorFactory) {
- // Process consecutive window operators
- while (i + 1 < operators.size() && operators.get(i + 1) instanceof
WindowOperatorFactory) {
- i++;
- currentStage.add(operators.get(i));
- }
-
- // Finalize the current stage
- operatorList.add(new ArrayList<>(currentStage));
- currentStage.clear();
- }
+ private AbstractSortOperatorFactory sortOperatorFactory;
+ private AbstractPartitioningOperatorFactory partitioningOperatorFactory;
+ private final List<WindowOperatorFactory> windowOperatorFactories;
+ private final int maxRowsMaterialized;
+
+ private WindowStage(int maxRowsMaterialized)
+ {
+ this.windowOperatorFactories = new ArrayList<>();
+ this.maxRowsMaterialized = maxRowsMaterialized;
}
- // There shouldn't be any operators left in currentStage. The last
operator should always be WindowOperatorFactory.
- if (!currentStage.isEmpty()) {
- throw new ISE(
- "Found unexpected operators [%s] present in the list of operators
[%s].",
- currentStage,
- operators
- );
+ private void addOperatorFactory(OperatorFactory op)
+ {
+ if (op instanceof AbstractSortOperatorFactory) {
+ this.sortOperatorFactory = (AbstractSortOperatorFactory) op;
+ } else if (op instanceof AbstractPartitioningOperatorFactory) {
+ this.partitioningOperatorFactory =
(AbstractPartitioningOperatorFactory) op;
+ } else {
+ this.windowOperatorFactories.add((WindowOperatorFactory) op);
+ }
}
- return operatorList;
- }
-
- private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory>
operatorFactories, int partitionCount)
- {
- AbstractPartitioningOperatorFactory partition = null;
- AbstractSortOperatorFactory sort = null;
- for (OperatorFactory of : operatorFactories) {
- if (of instanceof AbstractPartitioningOperatorFactory) {
- partition = (AbstractPartitioningOperatorFactory) of;
- } else if (of instanceof AbstractSortOperatorFactory) {
- sort = (AbstractSortOperatorFactory) of;
+ private List<OperatorFactory> getOperatorFactories()
+ {
+ List<OperatorFactory> operatorFactories = new ArrayList<>();
+ if (sortOperatorFactory != null) {
+ operatorFactories.add(sortOperatorFactory);
}
+ if (partitioningOperatorFactory != null) {
+ operatorFactories.add(partitioningOperatorFactory);
+ }
+ operatorFactories.addAll(windowOperatorFactories);
+ return operatorFactories;
}
- Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
- if (sort != null) {
- for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
- sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+ /**
+ * This method converts the operator chain received from native plan into
MSQ plan.
+ * (NaiveSortOperator -> Naive/GlueingPartitioningOperator ->
WindowOperator) is converted into (GlueingPartitioningOperator ->
PartitionSortOperator -> WindowOperator).
+ * We rely on MSQ's shuffling to do the clustering on partitioning keys
for us at every stage.
+ * This conversion allows us to blindly read N rows from input channel and
push them into the operator chain, and repeat until the input channel isn't
finished.
+ * @return
+ */
+ private List<OperatorFactory> getTransformedOperatorFactories()
+ {
+ List<OperatorFactory> operatorFactories = new ArrayList<>();
+ if (partitioningOperatorFactory != null) {
+ operatorFactories.add(new
GlueingPartitioningOperatorFactory(partitioningOperatorFactory.getPartitionColumns(),
maxRowsMaterialized));
+ }
+ if (sortOperatorFactory != null) {
+ operatorFactories.add(new
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
}
+ operatorFactories.addAll(windowOperatorFactories);
+ return operatorFactories;
}
- if (partition == null) {
- // If operatorFactories doesn't have any partitioning factory, then we
should keep the shuffle spec from previous stage.
- // This indicates that we already have the data partitioned correctly,
and hence we don't need to do any shuffling.
- return null;
+ private List<WindowOperatorFactory> getWindowOperatorFactories()
+ {
+ return windowOperatorFactories;
}
- if (partition.getPartitionColumns().isEmpty()) {
- return MixShuffleSpec.instance();
+ private ShuffleSpec findShuffleSpec(int partitionCount)
+ {
+ Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
+ if (sortOperatorFactory != null) {
+ for (ColumnWithDirection sortColumn :
sortOperatorFactory.getSortColumns()) {
+ sortColumnsMap.put(sortColumn.getColumn(),
sortColumn.getDirection());
+ }
+ }
+
+ if (partitioningOperatorFactory == null) {
+ // If the window stage doesn't have any partitioning factory, then we
should keep the shuffle spec from previous stage.
+ // This indicates that we already have the data partitioned correctly,
and hence we don't need to do any shuffling.
+ return null;
+ }
+
+ if (partitioningOperatorFactory.getPartitionColumns().isEmpty()) {
+ return MixShuffleSpec.instance();
+ }
+
+ final List<KeyColumn> keyColsOfWindow = new ArrayList<>();
+ for (String partitionColumn :
partitioningOperatorFactory.getPartitionColumns()) {
+ KeyColumn kc = new KeyColumn(
+ partitionColumn,
+ sortColumnsMap.get(partitionColumn) ==
ColumnWithDirection.Direction.DESC
+ ? KeyOrder.DESCENDING
+ : KeyOrder.ASCENDING
+ );
+ keyColsOfWindow.add(kc);
+ }
+
+ return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0),
partitionCount);
}
- List<KeyColumn> keyColsOfWindow = new ArrayList<>();
- for (String partitionColumn : partition.getPartitionColumns()) {
- KeyColumn kc;
- if (sortColumnsMap.get(partitionColumn) ==
ColumnWithDirection.Direction.DESC) {
- kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
- } else {
- kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+ private boolean canAccept(OperatorFactory operatorFactory)
+ {
+ if (getOperatorFactories().isEmpty()) {
+ return true;
+ }
+ if (operatorFactory instanceof AbstractSortOperatorFactory) {
+ return false;
}
- keyColsOfWindow.add(kc);
+ if (operatorFactory instanceof WindowOperatorFactory) {
+ return true;
+ }
+ if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
+ return sortOperatorFactory != null;
+ }
+ throw new ISE("Encountered unexpected operatorFactory type: [%s]",
operatorFactory.getClass().getName());
}
- return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0),
partitionCount);
+ private List<String> getPartitionColumns()
+ {
+ return partitioningOperatorFactory == null ? new ArrayList<>() :
partitioningOperatorFactory.getPartitionColumns();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WindowStage{" +
+ "sortOperatorFactory=" + sortOperatorFactory +
+ ", partitioningOperatorFactory=" + partitioningOperatorFactory +
+ ", windowOperatorFactories=" + windowOperatorFactories +
+ '}';
+ }
}
/**
@@ -307,65 +450,4 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
}
return queryDefBuilder;
}
-
- /**
- * Computes the ClusterBy for the final window stage. We don't have to take
the CLUSTERED BY columns into account,
- * as they are handled as {@link
org.apache.druid.query.scan.ScanQuery#orderBys}.
- */
- private static ClusterBy computeClusterByForFinalWindowStage(Granularity
segmentGranularity)
- {
- final List<KeyColumn> clusterByColumns = Collections.singletonList(new
KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
- return QueryKitUtils.clusterByWithSegmentGranularity(new
ClusterBy(clusterByColumns, 0), segmentGranularity);
- }
-
- /**
- * Computes the signature for the final window stage. The
finalWindowClusterBy will always have the
- * partition boost column as computed in {@link
#computeClusterByForFinalWindowStage(Granularity)}.
- */
- private static RowSignature computeSignatureForFinalWindowStage(RowSignature
rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity)
- {
- final RowSignature.Builder finalWindowStageRowSignatureBuilder =
RowSignature.builder()
-
.addAll(rowSignature)
-
.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
- return QueryKitUtils.sortableSignature(
-
QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(),
segmentGranularity),
- finalWindowClusterBy.getColumns()
- );
- }
-
- /**
- * This method converts the operator chain received from native plan into
MSQ plan.
- * (NaiveSortOperator -> Naive/GlueingPartitioningOperator ->
WindowOperator) is converted into (GlueingPartitioningOperator ->
PartitionSortOperator -> WindowOperator).
- * We rely on MSQ's shuffling to do the clustering on partitioning keys for
us at every stage.
- * This conversion allows us to blindly read N rows from input channel and
push them into the operator chain, and repeat until the input channel isn't
finished.
- * @param operatorFactoryListFromQuery
- * @param maxRowsMaterializedInWindow
- * @return
- */
- private List<OperatorFactory>
getTransformedOperatorFactoryListForStageDefinition(
- List<OperatorFactory> operatorFactoryListFromQuery,
- int maxRowsMaterializedInWindow
- )
- {
- final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
- final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
- for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
- if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
- AbstractPartitioningOperatorFactory partition =
(AbstractPartitioningOperatorFactory) operatorFactory;
- operatorFactoryList.add(new
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(),
maxRowsMaterializedInWindow));
- } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
- AbstractSortOperatorFactory sortOperatorFactory =
(AbstractSortOperatorFactory) operatorFactory;
- sortOperatorFactoryList.add(new
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
- } else {
- // Add all the PartitionSortOperator(s) before every window operator.
- operatorFactoryList.addAll(sortOperatorFactoryList);
- sortOperatorFactoryList.clear();
- operatorFactoryList.add(operatorFactory);
- }
- }
-
- operatorFactoryList.addAll(sortOperatorFactoryList);
- sortOperatorFactoryList.clear();
- return operatorFactoryList;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]