This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3f56b57c7e9 MSQ WF: Pass a flag from broker to determine operator
chain transformation (#17443)
3f56b57c7e9 is described below
commit 3f56b57c7e9ecff64531d8c2fc28e108feca67bf
Author: Akshat Jain <[email protected]>
AuthorDate: Tue Nov 12 09:28:28 2024 +0530
MSQ WF: Pass a flag from broker to determine operator chain transformation
(#17443)
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 12 ++++--
.../WindowOperatorQueryFrameProcessor.java | 38 +----------------
.../druid/msq/querykit/WindowOperatorQueryKit.java | 47 +++++++++++++++++++++-
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 3 ++
.../druid/msq/util/MultiStageQueryContext.java | 11 +++++
.../org/apache/druid/msq/test/MSQTestBase.java | 1 +
.../msq1.iq | 6 ++-
7 files changed, 74 insertions(+), 44 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 0615ba802bf..5c491c0780d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -571,7 +571,7 @@ public class ControllerImpl implements Controller
final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
- context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId,
querySpec, queryKernelConfig),
+ context.makeQueryKitSpec(makeQueryControllerToolKit(queryContext),
queryId, querySpec, queryKernelConfig),
querySpec,
context,
resultsContext
@@ -1211,13 +1211,19 @@ public class ControllerImpl implements Controller
}
@SuppressWarnings("rawtypes")
- private QueryKit<Query<?>> makeQueryControllerToolKit()
+ private QueryKit<Query<?>> makeQueryControllerToolKit(QueryContext
queryContext)
{
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new
ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new
GroupByQueryKit(context.jsonMapper()))
- .put(WindowOperatorQuery.class, new
WindowOperatorQueryKit(context.jsonMapper()))
+ .put(
+ WindowOperatorQuery.class,
+ new WindowOperatorQueryKit(
+ context.jsonMapper(),
+
MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext)
+ )
+ )
.build();
return new MultiQueryKit(kitMap);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 41c1df884f9..674e3ea4602 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -40,13 +40,9 @@ import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
-import org.apache.druid.query.operator.AbstractSortOperatorFactory;
-import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
-import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
@@ -101,7 +97,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized =
MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
- this.operatorFactoryList =
getOperatorFactoryListForStageDefinition(operatorFactoryList);
+ this.operatorFactoryList = operatorFactoryList;
this.frameRowsAndColsBuilder = new
RowsAndColumnsBuilder(this.maxRowsMaterialized);
this.frameReader = frameReader;
@@ -403,36 +399,4 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
resultRowAndCols.clear();
rowId.set(0);
}
-
- /**
- * This method converts the operator chain received from native plan into
MSQ plan.
- * (NaiveSortOperator -> Naive/GlueingPartitioningOperator ->
WindowOperator) is converted into (GlueingPartitioningOperator ->
PartitionSortOperator -> WindowOperator).
- * We rely on MSQ's shuffling to do the clustering on partitioning keys for
us at every stage.
- * This conversion allows us to blindly read N rows from input channel and
push them into the operator chain, and repeat until the input channel isn't
finished.
- * @param operatorFactoryListFromQuery
- * @return
- */
- private List<OperatorFactory>
getOperatorFactoryListForStageDefinition(List<OperatorFactory>
operatorFactoryListFromQuery)
- {
- final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
- final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
- for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
- if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
- AbstractPartitioningOperatorFactory partition =
(AbstractPartitioningOperatorFactory) operatorFactory;
- operatorFactoryList.add(new
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(),
this.maxRowsMaterialized));
- } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
- AbstractSortOperatorFactory sortOperatorFactory =
(AbstractSortOperatorFactory) operatorFactory;
- sortOperatorFactoryList.add(new
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
- } else {
- // Add all the PartitionSortOperator(s) before every window operator.
- operatorFactoryList.addAll(sortOperatorFactoryList);
- sortOperatorFactoryList.clear();
- operatorFactoryList.add(operatorFactory);
- }
- }
-
- operatorFactoryList.addAll(sortOperatorFactoryList);
- sortOperatorFactoryList.clear();
- return operatorFactoryList;
- }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index b168860ff4c..a46f62866a1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -37,7 +37,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
@@ -54,10 +56,12 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
{
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
private final ObjectMapper jsonMapper;
+ private final boolean isOperatorTransformationEnabled;
- public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+ public WindowOperatorQueryKit(ObjectMapper jsonMapper, boolean
isOperatorTransformationEnabled)
{
this.jsonMapper = jsonMapper;
+ this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
}
@Override
@@ -172,6 +176,9 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
.flatMap(of ->
of.getPartitionColumns().stream())
.collect(Collectors.toList());
+ final List<OperatorFactory> operatorFactories =
isOperatorTransformationEnabled
+ ?
getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i),
maxRowsMaterialized)
+ : operatorList.get(i);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
@@ -181,7 +188,7 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
.shuffleSpec(nextShuffleSpec)
.processorFactory(new
WindowOperatorQueryFrameProcessorFactory(
queryToRun,
- operatorList.get(i),
+ operatorFactories,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
@@ -325,4 +332,40 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
finalWindowClusterBy.getColumns()
);
}
+
+ /**
+ * This method converts the operator chain received from native plan into
MSQ plan.
+ * (NaiveSortOperator -> Naive/GlueingPartitioningOperator ->
WindowOperator) is converted into (GlueingPartitioningOperator ->
PartitionSortOperator -> WindowOperator).
+ * We rely on MSQ's shuffling to do the clustering on partitioning keys for
us at every stage.
+ * This conversion allows us to blindly read N rows from input channel and
push them into the operator chain, and repeat until the input channel isn't
finished.
+ * @param operatorFactoryListFromQuery
+ * @param maxRowsMaterializedInWindow
+ * @return
+ */
+ private List<OperatorFactory>
getTransformedOperatorFactoryListForStageDefinition(
+ List<OperatorFactory> operatorFactoryListFromQuery,
+ int maxRowsMaterializedInWindow
+ )
+ {
+ final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
+ final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
+ for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
+ if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
+ AbstractPartitioningOperatorFactory partition =
(AbstractPartitioningOperatorFactory) operatorFactory;
+ operatorFactoryList.add(new
GlueingPartitioningOperatorFactory(partition.getPartitionColumns(),
maxRowsMaterializedInWindow));
+ } else if (operatorFactory instanceof AbstractSortOperatorFactory) {
+ AbstractSortOperatorFactory sortOperatorFactory =
(AbstractSortOperatorFactory) operatorFactory;
+ sortOperatorFactoryList.add(new
PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
+ } else {
+ // Add all the PartitionSortOperator(s) before every window operator.
+ operatorFactoryList.addAll(sortOperatorFactoryList);
+ sortOperatorFactoryList.clear();
+ operatorFactoryList.add(operatorFactory);
+ }
+ }
+
+ operatorFactoryList.addAll(sortOperatorFactoryList);
+ sortOperatorFactoryList.clear();
+ return operatorFactoryList;
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 7cf8201c525..5462b991737 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -289,6 +289,9 @@ public class MSQTaskQueryMaker implements QueryMaker
// Add appropriate finalization to native query context.
nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY,
finalizeAggregations);
+ // This flag is to ensure backward compatibility, as brokers are upgraded
after indexers/middlemanagers.
+
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
true);
+
final MSQSpec querySpec =
MSQSpec.builder()
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9059a81ffe3..7112a101c04 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -191,6 +191,9 @@ public class MultiStageQueryContext
public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW =
"maxRowsMaterializedInWindow";
+ // This flag ensures backward compatibility and will be removed in Druid 33,
with the default behavior as enabled.
+ public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION =
"windowFunctionOperatorTransformation";
+
public static final String CTX_SKIP_TYPE_VERIFICATION =
"skipTypeVerification";
/**
@@ -217,6 +220,14 @@ public class MultiStageQueryContext
);
}
+ public static boolean isWindowFunctionOperatorTransformationEnabled(final
QueryContext queryContext)
+ {
+ return queryContext.getBoolean(
+ WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
+ false
+ );
+ }
+
public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 62496abacf6..e99b571b807 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll")
+
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
.build();
public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
diff --git
a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
index 502885fb3ae..3a2208d0361 100644
---
a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
+++
b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
@@ -95,7 +95,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
- "sqlStringifyArrays" : false
+ "sqlStringifyArrays" : false,
+ "windowFunctionOperatorTransformation" : true
}
}
},
@@ -201,7 +202,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
- "sqlStringifyArrays" : false
+ "sqlStringifyArrays" : false,
+ "windowFunctionOperatorTransformation" : true
}
}
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]