This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1112e5dbdc [improvement](fe) Avoid two-phase agg for single instance
(#63732)
b1112e5dbdc is described below
commit b1112e5dbdc628bffa9019fe92d6f843c94b7b3e
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 2 17:04:36 2026 +0800
[improvement](fe) Avoid two-phase agg for single instance (#63732)
In a single-BE single-instance execution, non-distinct
aggregation does not benefit from splitting into local and global
phases. The split can add an unnecessary hash exchange and extra
aggregate operator for high-cardinality group-by queries. This change
detects the single execution instance case during Nereids non-distinct
aggregate implementation and only generates the one-phase aggregate
candidate. It also lets the global aggregate request ANY child
distribution in that case so the optimizer does not add a redundant
exchange.
### Release note
Optimize aggregation planning for single-BE single-instance execution by
avoiding unnecessary local/global aggregate split.
---
.../glue/translator/PhysicalPlanTranslator.java | 6 +-
.../properties/ChildrenPropertiesRegulator.java | 9 +-
.../SplitAggMultiPhaseWithoutGbyKey.java | 108 +++++++++++++++------
.../implementation/SplitAggWithoutDistinct.java | 16 ++-
.../apache/doris/nereids/util/AggregateUtils.java | 12 +++
.../translator/PhysicalPlanTranslatorTest.java | 1 +
.../processor/post/ShuffleKeyPrunerTest.java | 1 +
.../ChildrenPropertiesRegulatorTest.java | 37 +++++++
.../properties/RequestPropertyDeriverTest.java | 7 +-
.../exploration/mv/PartitionColumnTraceTest.java | 1 +
.../implementation/BucketedAggregateTest.java | 22 +++++
.../rules/rewrite/AggregateStrategiesTest.java | 18 ++--
.../rules/rewrite/AggregateUnionPlanTest.java | 68 +++++++++++++
.../rules/rewrite/SplitMultiDistinctTest.java | 1 +
.../doris/planner/QueryCacheNormalizerTest.java | 1 +
.../apache/doris/planner/StatisticDeriveTest.java | 1 +
.../org/apache/doris/qe/OlapQueryCacheTest.java | 1 +
.../data/function_p0/test_agg_foreach.out | 8 +-
.../data/function_p0/test_agg_foreach_notnull.out | 5 +-
.../suites/function_p0/test_agg_foreach.groovy | 37 +++++--
.../function_p0/test_agg_foreach_notnull.groovy | 21 +++-
.../nereids_function_p0/agg_function/agg.groovy | 1 +
.../agg_function/sum0_cte.groovy | 1 +
.../adjust_nullable/test_adjust_nullable.groovy | 1 +
.../adjust_nullable/test_subquery_nullable.groovy | 1 +
.../agg_join_pkfk/agg_join_pkfk.groovy | 1 +
.../agg_skew_rewrite/agg_skew_rewrite.groovy | 1 +
.../agg_strategy/distinct_agg_rewriter.groovy | 1 +
.../distinct_agg_strategy_selector.groovy | 1 +
.../agg_strategy/physical_agg_regulator.groovy | 1 +
.../agg_strategy/test_variables.groovy | 1 +
.../constant_propagation.groovy | 1 +
.../suites/nereids_rules_p0/cse/cse.groovy | 1 +
.../distinct_split/disitinct_split.groovy | 2 +-
.../eliminate_aggregate_casewhen.groovy | 1 +
.../eliminate_aggregate_constant.groovy | 1 +
.../test_convert_median_to_percentile.groovy | 1 +
.../expression/test_simplify_range.groovy | 1 +
.../push_down_expression_in_hash_join.groovy | 1 +
.../infer_predicate/infer_intersect_except.groovy | 1 +
.../infer_predicate/pull_up_predicate_agg.groovy | 1 +
.../pull_up_predicate_literal.groovy | 1 +
.../pull_up_predicate_set_op.groovy | 1 +
.../max_min_filter_push_down.groovy | 1 +
.../merge_aggregate/merge_aggregate.groovy | 1 +
.../agg_optimize_when_uniform.groovy | 1 +
.../predicate_infer/infer_predicate.groovy | 1 +
.../project_distinct_to_agg.groovy | 1 +
.../push_count_into_union_all.groovy | 1 +
.../push_down_limit_distinct_through_join.groovy | 1 +
.../push_down_top_n_distinct_through_union.groovy | 1 +
.../nereids_rules_p0/salt_join/salt_join.groovy | 1 +
.../suites/nereids_rules_p0/sumRewrite.groovy | 1 +
.../transposeJoin/transposeSemiJoinAgg.groovy | 1 +
.../add_project_for_unique_function.groovy | 1 +
.../agg_with_unique_function.groovy | 1 +
.../window_skew_rewrite/window_skew_rewrite.groovy | 1 +
.../suites/nereids_syntax_p0/agg_4_phase.groovy | 1 +
.../suites/nereids_syntax_p0/analyze_agg.groovy | 1 +
.../suites/nereids_syntax_p0/analyze_repeat.groovy | 1 +
.../suites/nereids_syntax_p0/analyze_sort.groovy | 1 +
.../suites/nereids_tpch_p0/tpch/agg_cse.groovy | 1 +
.../nereids_tpch_p0/tpch/push_topn_to_agg.groovy | 1 +
.../suites/nereids_tpch_p0/tpch/topn-filter.groovy | 1 +
.../test_python_raise_error_propagation.groovy | 3 +-
.../query_p0/aggregate/agg_union_random.groovy | 1 +
.../suites/query_p0/cache/query_cache.groovy | 1 +
.../query_p0/cache/query_cache_with_context.groovy | 1 +
.../dist_expr_list/dist_expr_list.groovy | 1 +
.../suites/query_p0/eager_agg/eager_agg.groovy | 1 +
.../suites/query_p0/hint/fix_leading.groovy | 2 +-
.../suites/query_p0/hint/multi_leading.groovy | 2 +-
.../suites/query_p0/hint/test_hint.groovy | 2 +-
.../query_p0/repeat/test_repeat_output_slot.groovy | 1 +
.../suites/query_p0/runtime_filter/check_rf.groovy | 1 +
.../suites/query_p0/set_operations/except.groovy | 1 +
.../query_cache_with_rec_cte_test.groovy | 1 +
.../suites/shape_check/clickbench/query1.groovy | 1 +
.../suites/shape_check/clickbench/query11.groovy | 1 +
.../suites/shape_check/clickbench/query12.groovy | 1 +
.../suites/shape_check/clickbench/query13.groovy | 1 +
.../suites/shape_check/clickbench/query14.groovy | 1 +
.../suites/shape_check/clickbench/query15.groovy | 1 +
.../suites/shape_check/clickbench/query2.groovy | 1 +
.../suites/shape_check/clickbench/query21.groovy | 1 +
.../suites/shape_check/clickbench/query22.groovy | 1 +
.../suites/shape_check/clickbench/query23.groovy | 1 +
.../suites/shape_check/clickbench/query28.groovy | 1 +
.../suites/shape_check/clickbench/query29.groovy | 1 +
.../suites/shape_check/clickbench/query3.groovy | 1 +
.../suites/shape_check/clickbench/query30.groovy | 1 +
.../suites/shape_check/clickbench/query31.groovy | 1 +
.../suites/shape_check/clickbench/query32.groovy | 1 +
.../suites/shape_check/clickbench/query33.groovy | 1 +
.../suites/shape_check/clickbench/query34.groovy | 1 +
.../suites/shape_check/clickbench/query35.groovy | 1 +
.../suites/shape_check/clickbench/query36.groovy | 1 +
.../suites/shape_check/clickbench/query37.groovy | 1 +
.../suites/shape_check/clickbench/query38.groovy | 1 +
.../suites/shape_check/clickbench/query39.groovy | 1 +
.../suites/shape_check/clickbench/query4.groovy | 1 +
.../suites/shape_check/clickbench/query40.groovy | 1 +
.../suites/shape_check/clickbench/query41.groovy | 1 +
.../suites/shape_check/clickbench/query42.groovy | 1 +
.../suites/shape_check/clickbench/query43.groovy | 1 +
.../suites/shape_check/clickbench/query6.groovy | 1 +
.../suites/shape_check/clickbench/query7.groovy | 1 +
.../suites/shape_check/clickbench/query8.groovy | 1 +
.../suites/shape_check/clickbench/query9.groovy | 1 +
.../suites/shape_check/others/nlj.groovy | 1 +
110 files changed, 411 insertions(+), 65 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7b01ce6c6b1..9e921555422 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1157,9 +1157,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
aggregateExpression.getAggregateParam().aggMode.productAggregateBuffer);
}
}
- // no need to traverse children, because
AggregateExpression
- // should not have a AggregateExpression child
- return true;
+ // Continue through transparent guards unless this
guard directly wraps
+ // the aggregate expression already processed above.
+ return guardExpr.child() instanceof
AggregateExpression;
}
if (c instanceof AggregateExpression) {
AggregateExpression aggregateExpression =
(AggregateExpression) c;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 0a229b71a95..845c87eea9c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -136,13 +136,16 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
* */
private boolean shouldBanOnePhaseAgg(PhysicalHashAggregate<? extends Plan>
aggregate,
PhysicalProperties requiredChildProperty) {
- if (banAggUnionAll(aggregate)) {
- return true;
- }
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().aggPhase == 1) {
return false;
}
+ if (ctx != null && AggregateUtils.isSingleExecutionInstance(ctx)) {
+ return false;
+ }
+ if (banAggUnionAll(aggregate)) {
+ return true;
+ }
if (!onePhaseAggWithDistribute(aggregate)) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java
index 294777a900a..28ddb8ff718 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java
@@ -24,6 +24,7 @@ import
org.apache.doris.nereids.trees.expressions.AggregateExpression;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SessionVarGuardExpr;
import org.apache.doris.nereids.trees.expressions.Slot;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
@@ -154,10 +155,13 @@ public class SplitAggMultiPhaseWithoutGbyKey extends
SplitAggBaseRule implements
AggregateParam inputToResultParam = new
AggregateParam(AggPhase.GLOBAL, AggMode.INPUT_TO_RESULT);
Map<AggregateFunction, Alias> originFuncToAliasPhase1 = new
HashMap<>();
+ Map<AggregateFunction, Expression> aggregateFunctionWithGuardExpr =
+ logicalAgg.getAggregateFunctionWithGuardExpr();
for (AggregateFunction function : aggregateFunctions) {
AggregateFunction aggFunc =
AggregateUtils.tryConvertToMultiDistinct(function);
AggregateExpression localAggExpr = new
AggregateExpression(aggFunc, inputToResultParam);
- originFuncToAliasPhase1.put(function, new Alias(localAggExpr));
+ originFuncToAliasPhase1.put(function,
+ new Alias(withSessionVarGuard(function, localAggExpr,
aggregateFunctionWithGuardExpr)));
}
List<NamedExpression> localAggOutput =
ImmutableList.<NamedExpression>builder()
@@ -172,37 +176,14 @@ public class SplitAggMultiPhaseWithoutGbyKey extends
SplitAggBaseRule implements
List<NamedExpression> globalOutput =
ExpressionUtils.rewriteDownShortCircuit(
logicalAgg.getOutputExpressions(), outputChild -> {
+ if (outputChild instanceof SessionVarGuardExpr
+ && ((SessionVarGuardExpr) outputChild).child()
instanceof AggregateFunction) {
+ return rewriteFinalAggregate((AggregateFunction)
((SessionVarGuardExpr) outputChild).child(),
+ originFuncToAliasPhase1,
((SessionVarGuardExpr) outputChild).getSessionVars());
+ }
if (outputChild instanceof AggregateFunction) {
- Alias alias = originFuncToAliasPhase1.get(outputChild);
- AggregateExpression localAggExpr =
(AggregateExpression) alias.child();
- AggregateFunction aggFunc = localAggExpr.getFunction();
- Slot childSlot = alias.toSlot();
- if (aggFunc instanceof MultiDistinction) {
- Map<Class<? extends AggregateFunction>,
Supplier<AggregateFunction>> functionMap =
- ImmutableMap.of(
- MultiDistinctCount.class, () -> new
Sum0(childSlot),
- MultiDistinctSum.class, () -> new
Sum(childSlot),
- MultiDistinctSum0.class, () -> new
Sum0(childSlot),
- // TODO: now we don't support
group_concat,
- // we need add support for
group_concat without order by,
- // and add test for group_concat
- MultiDistinctGroupConcat.class, () ->
new GroupConcat(childSlot));
- return new
AggregateExpression(functionMap.get(aggFunc.getClass()).get(), param);
- } else {
- Map<Class<? extends AggregateFunction>,
Supplier<AggregateFunction>> functionMap =
- ImmutableMap.of(
- Count.class, () -> new Sum0(childSlot),
- Sum.class, () -> new Sum(childSlot),
- Sum0.class, () -> new Sum0(childSlot),
- Min.class, () -> new Min(childSlot),
- Max.class, () -> new Max(childSlot),
- AnyValue.class, () -> new
AnyValue(childSlot),
- // TODO: now we don't support
group_concat,
- // we need add support for
group_concat without order by,
- // and add test for group_concat
- GroupConcat.class, () -> new
GroupConcat(childSlot));
- return new
AggregateExpression(functionMap.get(aggFunc.getClass()).get(), param,
childSlot);
- }
+ return rewriteFinalAggregate((AggregateFunction)
outputChild, originFuncToAliasPhase1,
+ aggregateFunctionWithGuardExpr, param);
} else {
return outputChild;
}
@@ -233,4 +214,69 @@ public class SplitAggMultiPhaseWithoutGbyKey extends
SplitAggBaseRule implements
}
return true;
}
+
+ private Expression withSessionVarGuard(AggregateFunction originFunction,
AggregateExpression aggregateExpression,
+ Map<AggregateFunction, Expression> aggregateFunctionWithGuardExpr)
{
+ Expression guardExpr =
aggregateFunctionWithGuardExpr.get(originFunction);
+ if (guardExpr instanceof SessionVarGuardExpr) {
+ return withSessionVarGuard(aggregateExpression,
((SessionVarGuardExpr) guardExpr).getSessionVars());
+ }
+ return aggregateExpression;
+ }
+
+ private Expression withSessionVarGuard(Expression expression, Map<String,
String> sessionVars) {
+ return new SessionVarGuardExpr(expression, sessionVars);
+ }
+
+ private Expression rewriteFinalAggregate(AggregateFunction originFunction,
+ Map<AggregateFunction, Alias> originFuncToAliasPhase1,
Map<AggregateFunction, Expression> guardExprs,
+ AggregateParam param) {
+ Expression finalAggregate = rewriteFinalAggregate(originFunction,
originFuncToAliasPhase1, param);
+ Expression guardExpr = guardExprs.get(originFunction);
+ if (guardExpr instanceof SessionVarGuardExpr) {
+ return withSessionVarGuard(finalAggregate, ((SessionVarGuardExpr)
guardExpr).getSessionVars());
+ }
+ return finalAggregate;
+ }
+
+ private Expression rewriteFinalAggregate(AggregateFunction originFunction,
+ Map<AggregateFunction, Alias> originFuncToAliasPhase1, Map<String,
String> sessionVars) {
+ return withSessionVarGuard(rewriteFinalAggregate(originFunction,
originFuncToAliasPhase1,
+ new AggregateParam(AggPhase.GLOBAL, AggMode.INPUT_TO_RESULT,
false)), sessionVars);
+ }
+
+ private Expression rewriteFinalAggregate(AggregateFunction originFunction,
+ Map<AggregateFunction, Alias> originFuncToAliasPhase1,
AggregateParam param) {
+ Alias alias = originFuncToAliasPhase1.get(originFunction);
+ AggregateExpression localAggExpr = (AggregateExpression)
+ SessionVarGuardExpr.getSessionVarGuardChild(alias.child());
+ AggregateFunction aggFunc = localAggExpr.getFunction();
+ Slot childSlot = alias.toSlot();
+ if (aggFunc instanceof MultiDistinction) {
+ Map<Class<? extends AggregateFunction>,
Supplier<AggregateFunction>> functionMap =
+ ImmutableMap.of(
+ MultiDistinctCount.class, () -> new Sum0(childSlot),
+ MultiDistinctSum.class, () -> new Sum(childSlot),
+ MultiDistinctSum0.class, () -> new Sum0(childSlot),
+ // TODO: now we don't support group_concat,
+ // we need add support for group_concat without order
by,
+ // and add test for group_concat
+ MultiDistinctGroupConcat.class, () -> new
GroupConcat(childSlot));
+ return new
AggregateExpression(functionMap.get(aggFunc.getClass()).get(), param);
+ } else {
+ Map<Class<? extends AggregateFunction>,
Supplier<AggregateFunction>> functionMap =
+ ImmutableMap.of(
+ Count.class, () -> new Sum0(childSlot),
+ Sum.class, () -> new Sum(childSlot),
+ Sum0.class, () -> new Sum0(childSlot),
+ Min.class, () -> new Min(childSlot),
+ Max.class, () -> new Max(childSlot),
+ AnyValue.class, () -> new AnyValue(childSlot),
+ // TODO: now we don't support group_concat,
+ // we need add support for group_concat without order
by,
+ // and add test for group_concat
+ GroupConcat.class, () -> new GroupConcat(childSlot));
+ return new
AggregateExpression(functionMap.get(aggFunc.getClass()).get(), param,
childSlot);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java
index 7fc7431b216..fcadf6b3c70 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java
@@ -89,11 +89,17 @@ public class SplitAggWithoutDistinct extends
OneImplementationRuleFactory {
candidates.addAll(splitTwoPhase(aggregate));
break;
default:
- candidates.addAll(implementOnePhase(aggregate));
- candidates.addAll(splitTwoPhase(aggregate));
- // Only add bucketed agg candidate in auto mode (aggPhase ==
0).
- // When the user forces a specific phase, respect that choice.
- candidates.addAll(implementBucketedPhase(aggregate, ctx));
+ List<Plan> onePhaseCandidates = implementOnePhase(aggregate);
+ candidates.addAll(onePhaseCandidates);
+ boolean singleExecutionInstance =
AggregateUtils.isSingleExecutionInstance(ctx);
+ if (!singleExecutionInstance || onePhaseCandidates.isEmpty()) {
+ candidates.addAll(splitTwoPhase(aggregate));
+ // Only add bucketed agg candidate in auto mode (aggPhase
== 0).
+ // When the user forces a specific phase, respect that
choice.
+ if (!singleExecutionInstance) {
+ candidates.addAll(implementBucketedPhase(aggregate,
ctx));
+ }
+ }
break;
}
return candidates.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
index 6a19881c37e..c181af415fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
@@ -90,6 +90,18 @@ public class AggregateUtils {
&& param.aggPhase.isLocal();
}
+ /** Whether the plan will run with one fragment instance on one BE. */
+ public static boolean isSingleExecutionInstance(ConnectContext
connectContext) {
+ int beNumber =
connectContext.getSessionVariable().getBeNumberForTest();
+ if (beNumber < 0) {
+ beNumber =
connectContext.getEnv().getClusterInfo().getAllBackendByCurrentCluster(true).size();
+ }
+ beNumber = Math.max(1, beNumber);
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
+ return beNumber == 1 && parallelInstance == 1;
+ }
+
/**
* Check whether any expression in the collection has unknown statistics.
* Statistics are considered unknown if they are null, isUnKnown(), or
cannot be estimated.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
index 1f05e8eff52..c31f96792f2 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -82,6 +82,7 @@ public class PhysicalPlanTranslatorTest extends
TestWithFeService {
createDatabase("test_db");
createTable("create table test_db.t(a int, b int) distributed by
hash(a) buckets 3 "
+ "properties('replication_num' = '1');");
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
createTable("create table test_db.partitioned_t(k1 int, p1 int)\n"
+ "duplicate key(k1, p1)\n"
+ "partition by range(`p1`)\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/processor/post/ShuffleKeyPrunerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/processor/post/ShuffleKeyPrunerTest.java
index 48339192493..61e8039bf0d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/processor/post/ShuffleKeyPrunerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/processor/post/ShuffleKeyPrunerTest.java
@@ -60,6 +60,7 @@ class ShuffleKeyPrunerTest extends TestWithFeService {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setParallelResultSink(false);
connectContext.getSessionVariable().enableShuffleKeyPrune = true;
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
createTable("CREATE TABLE `t1` (\n"
+ " `a` int(11) NULL,\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
index cb2534765cc..2bd445e60c6 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
@@ -24,11 +24,20 @@ import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -140,6 +149,34 @@ public class ChildrenPropertiesRegulatorTest {
testMustShuffleFilter(PhysicalLimit.class);
}
+ @Test
+ public void testSingleExecutionInstanceAllowsOnePhaseAggWithDistribute() {
+ ConnectContext ctx = new ConnectContext();
+ ctx.getSessionVariable().setBeNumberForTest(1);
+ ctx.getSessionVariable().parallelPipelineTaskNum = 1;
+ try (MockedStatic<ConnectContext> mockedConnectContext =
Mockito.mockStatic(ConnectContext.class)) {
+ mockedConnectContext.when(ConnectContext::get).thenReturn(ctx);
+
+ GroupPlan mockedGroupPlan = Mockito.mock(GroupPlan.class);
+ Mockito.when(mockedGroupPlan.getAllChildrenTypes()).thenReturn(new
BitSet());
+
Mockito.when(mockedGroupPlan.getLogicalProperties()).thenReturn(Mockito.mock(LogicalProperties.class));
+ PhysicalDistribute<GroupPlan> distribute = new
PhysicalDistribute<>(
+ DistributionSpecGather.INSTANCE, mockedGroupPlan);
+ GroupExpression child = new GroupExpression(distribute);
+ SlotReference output = new SlotReference("col1",
IntegerType.INSTANCE);
+ PhysicalHashAggregate<GroupPlan> aggregate = new
PhysicalHashAggregate<>(
+ Lists.newArrayList(),
Lists.<NamedExpression>newArrayList(output),
+ new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_RESULT),
+ false, null, false, mockedGroupPlan);
+ GroupExpression parent = new GroupExpression(aggregate);
+
+ ChildrenPropertiesRegulator regulator = new
ChildrenPropertiesRegulator(parent,
+ Lists.newArrayList(child),
Lists.newArrayList(PhysicalProperties.GATHER),
+ Lists.newArrayList(PhysicalProperties.GATHER),
mockedJobContext);
+
Assertions.assertFalse(regulator.adjustChildrenProperties().isEmpty());
+ }
+ }
+
private void testMustShuffleFilter(Class<? extends Plan> childClazz) {
try (MockedStatic<CostCalculator> mockedCostCalculator =
Mockito.mockStatic(CostCalculator.class)) {
mockedCostCalculator.when(() ->
CostCalculator.calculateCost(Mockito.any(), Mockito.any(),
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
index 385959e96da..27d2c1f4ca1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
@@ -53,6 +53,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.types.IntegerType;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -372,8 +373,9 @@ class RequestPropertyDeriverTest {
@Test
void testAggregateWithAggShuffleUseParentKeyDisabled() {
// Create ConnectContext with aggShuffleUseParentKey = false
- ConnectContext testConnectContext = new ConnectContext();
+ ConnectContext testConnectContext =
MemoTestUtils.createConnectContext();
testConnectContext.getSessionVariable().aggShuffleUseParentKey = false;
+ testConnectContext.getSessionVariable().setBeNumberForTest(3);
SlotReference key1 = new SlotReference(new ExprId(0), "col1",
IntegerType.INSTANCE, true, ImmutableList.of());
SlotReference key2 = new SlotReference(new ExprId(1), "col2",
IntegerType.INSTANCE, true, ImmutableList.of());
@@ -411,8 +413,9 @@ class RequestPropertyDeriverTest {
@Test
void testAggregateWithAggShuffleUseParentKeyEnabled() {
// Create ConnectContext with aggShuffleUseParentKey = true (default
value)
- ConnectContext testConnectContext = new ConnectContext();
+ ConnectContext testConnectContext =
MemoTestUtils.createConnectContext();
testConnectContext.getSessionVariable().aggShuffleUseParentKey = true;
+ testConnectContext.getSessionVariable().setBeNumberForTest(3);
SlotReference key1 = new SlotReference(new ExprId(0), "col1",
IntegerType.INSTANCE, true, ImmutableList.of());
SlotReference key2 = new SlotReference(new ExprId(1), "col2",
IntegerType.INSTANCE, true, ImmutableList.of());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java
index 33bbb6122ce..53b98de4409 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java
@@ -42,6 +42,7 @@ public class PartitionColumnTraceTest extends
TestWithFeService {
protected void runBeforeAll() throws Exception {
createDatabase("partition_column_trace_test");
useDatabase("partition_column_trace_test");
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
createTable("CREATE TABLE IF NOT EXISTS lineitem (\n"
+ " L_ORDERKEY INTEGER NOT NULL,\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/BucketedAggregateTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/BucketedAggregateTest.java
index a48bdea7bfb..91b0fc4688e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/BucketedAggregateTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/BucketedAggregateTest.java
@@ -25,6 +25,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
+import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -123,6 +124,7 @@ public class BucketedAggregateTest implements
MemoPatternMatchSupported {
ConnectContext ctx = MemoTestUtils.createConnectContext();
ctx.getSessionVariable().enableBucketedHashAgg = true;
ctx.getSessionVariable().setBeNumberForTest(1);
+ ctx.getSessionVariable().parallelPipelineTaskNum = 2;
ctx.getSessionVariable().bucketedAggMinInputRows = 0;
ctx.getSessionVariable().bucketedAggMaxGroupKeys = 0;
ctx.getSessionVariable().bucketedAggHighCardThreshold = 1.0;
@@ -133,6 +135,26 @@ public class BucketedAggregateTest implements
MemoPatternMatchSupported {
.matches(physicalBucketedHashAggregate());
}
+ @Test
+ public void testSingleExecutionInstanceOnlyGeneratesOnePhaseAgg() {
+ Plan root = buildAggregateWithGroupBy();
+ ConnectContext ctx = MemoTestUtils.createConnectContext();
+ ctx.getSessionVariable().enableBucketedHashAgg = true;
+ ctx.getSessionVariable().setBeNumberForTest(1);
+ ctx.getSessionVariable().parallelPipelineTaskNum = 1;
+ ctx.getSessionVariable().bucketedAggMinInputRows = 0;
+ ctx.getSessionVariable().bucketedAggMaxGroupKeys = 0;
+ ctx.getSessionVariable().bucketedAggHighCardThreshold = 1.0;
+
+ PlanChecker.from(ctx, root)
+ .deriveStats()
+ .applyImplementation(splitAggWithoutDistinctRule())
+ .matches(physicalHashAggregate()
+ .when(agg ->
agg.getAggPhase().equals(AggPhase.GLOBAL)))
+ .nonMatch(physicalHashAggregate(physicalHashAggregate()))
+ .nonMatch(physicalBucketedHashAggregate());
+ }
+
@Test
public void testBucketedAggForcedAggPhase() {
// When user forces agg_phase = 2, bucketed agg should NOT be generated
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java
index 996a4bc675b..949b79474f5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java
@@ -97,7 +97,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
Sum localOutput1 = new Sum(rStudent.getOutput().get(0).toSlot());
Slot localGroupBy = rStudent.getOutput().get(2).toSlot();
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(twoPhaseAggregateWithoutDistinct())
.matches(
physicalHashAggregate(
@@ -147,7 +147,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
Sum localOutput0 = new Sum(false, true,
rStudent.getOutput().get(0).toSlot());
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(twoPhaseAggregateWithoutDistinct())
.matches(
physicalHashAggregate(
@@ -193,7 +193,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
Sum localOutput1 = new Sum(rStudent.getOutput().get(0).toSlot());
Expression localGroupBy = rStudent.getOutput().get(2).toSlot();
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(twoPhaseAggregateWithoutDistinct())
.matches(
physicalHashAggregate(
@@ -337,7 +337,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
// sum
Sum phaseOneSumId = new Sum(id);
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(fourPhaseAggregateWithDistinct())
.matchesFromRoot(
physicalHashAggregate(
@@ -400,7 +400,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
true, Optional.empty(), rStudent);
// select count(distinct id), sum(id) from t;
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(fourPhaseAggregateWithDistinctWithoutGbyKey())
.matches(
physicalHashAggregate(
@@ -431,6 +431,12 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
.get();
}
+ private ConnectContext createMultiInstanceContext() {
+ ConnectContext ctx = MemoTestUtils.createConnectContext();
+ ctx.getSessionVariable().parallelPipelineTaskNum = 2;
+ return ctx;
+ }
+
@Developing
private Rule fourPhaseAggregateWithDistinct() {
return SplitAggMultiPhase.INSTANCE.buildRules()
@@ -471,7 +477,7 @@ public class AggregateStrategiesTest implements
MemoPatternMatchSupported {
true, Optional.empty(), rStudent);
// select count(distinct id) group by age;
- PlanChecker.from(MemoTestUtils.createConnectContext(), root)
+ PlanChecker.from(createMultiInstanceContext(), root)
.applyImplementation(skewRewriteRule())
.matches(
physicalHashAggregate(
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateUnionPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateUnionPlanTest.java
index 96c8ca2d76e..0e21d6dad96 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateUnionPlanTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateUnionPlanTest.java
@@ -24,6 +24,7 @@ import
org.apache.doris.nereids.util.MemoPatternMatchSupported;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
@@ -35,6 +36,7 @@ public class AggregateUnionPlanTest extends TestWithFeService
implements MemoPat
protected void runBeforeAll() throws Exception {
createDatabase("test_agg_union");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
// Tables with RANDOM distribution: no PhysicalDistribute needed in
union inputs
createTable("CREATE TABLE test_agg_union.t1_random ("
@@ -60,6 +62,19 @@ public class AggregateUnionPlanTest extends
TestWithFeService implements MemoPat
+ ") ENGINE=OLAP DUPLICATE KEY(a, b)"
+ " DISTRIBUTED BY HASH(a) BUCKETS 3"
+ " PROPERTIES ('replication_allocation' =
'tag.location.default: 1');");
+ createTable("CREATE TABLE test_agg_union.bitmap_tbl ("
+ + " id INT,"
+ + " tag INT,"
+ + " user_id BITMAP BITMAP_UNION"
+ + ") ENGINE=OLAP AGGREGATE KEY(id, tag)"
+ + " DISTRIBUTED BY HASH(id) BUCKETS 1"
+ + " PROPERTIES ('replication_allocation' =
'tag.location.default: 1');");
+ createTable("CREATE TABLE test_agg_union.decimal_tbl ("
+ + " f1 DECIMALV3(30, 5) NULL,"
+ + " f2 DECIMALV3(10, 6) NULL"
+ + ") ENGINE=OLAP DUPLICATE KEY(f1)"
+ + " DISTRIBUTED BY HASH(f1) BUCKETS 1"
+ + " PROPERTIES ('replication_allocation' =
'tag.location.default: 1');");
}
@Test
@@ -100,6 +115,59 @@ public class AggregateUnionPlanTest extends
TestWithFeService implements MemoPat
});
}
+ @Test
+ public void testNestedSetOperationDistinctUnionSingleInstance() {
+ int beNumberForTest =
connectContext.getSessionVariable().getBeNumberForTest();
+ int parallelPipelineTaskNum =
connectContext.getSessionVariable().parallelPipelineTaskNum;
+ connectContext.getSessionVariable().setBeNumberForTest(1);
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ try {
+ PlanChecker.from(connectContext).checkPlannerResult(
+ "SELECT * FROM (SELECT 1 a INTERSECT SELECT 1 a) t1"
+ + " UNION "
+ + "SELECT * FROM (SELECT 2 a EXCEPT SELECT 3 a)
t2");
+ } finally {
+
connectContext.getSessionVariable().setBeNumberForTest(beNumberForTest);
+ connectContext.getSessionVariable().parallelPipelineTaskNum =
parallelPipelineTaskNum;
+ }
+ }
+
+ @Test
+ public void testTwoPhaseOnlyAggregateSingleInstance() {
+ int beNumberForTest =
connectContext.getSessionVariable().getBeNumberForTest();
+ int parallelPipelineTaskNum =
connectContext.getSessionVariable().parallelPipelineTaskNum;
+ connectContext.getSessionVariable().setBeNumberForTest(1);
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ try {
+ PlanChecker.from(connectContext).checkPlannerResult(
+ "SELECT orthogonal_bitmap_expr_calculate(user_id, tag,
'(100&200)')"
+ + " FROM test_agg_union.bitmap_tbl");
+ PlanChecker.from(connectContext).checkPlannerResult(
+ "SELECT orthogonal_bitmap_expr_calculate_count(user_id,
tag, '(100&200)')"
+ + " FROM test_agg_union.bitmap_tbl");
+ } finally {
+
connectContext.getSessionVariable().setBeNumberForTest(beNumberForTest);
+ connectContext.getSessionVariable().parallelPipelineTaskNum =
parallelPipelineTaskNum;
+ }
+ }
+
+ @Test
+ public void testDecimal256GuardOnDistinctAggregateWithoutGroupBy() throws
Exception {
+ connectContext.getSessionVariable().enableDecimal256 = true;
+ try {
+ dropView("DROP VIEW IF EXISTS
test_agg_union.v_decimal_distinct_sum");
+ createView("CREATE VIEW test_agg_union.v_decimal_distinct_sum AS "
+ + "SELECT sum(DISTINCT f1 * f2) AS col_sum FROM
test_agg_union.decimal_tbl");
+ } finally {
+ connectContext.getSessionVariable().enableDecimal256 = false;
+ }
+
+ PlanChecker.from(connectContext).checkPlannerResult(
+ "SELECT * FROM test_agg_union.v_decimal_distinct_sum");
+ Assertions.assertTrue(getSQLPlanOrErrorMsg(
+ "EXPLAIN SELECT * FROM
test_agg_union.v_decimal_distinct_sum").contains("PLAN FRAGMENT"));
+ }
+
/**
* Walk up through PhysicalProject nodes to find if a PhysicalUnion sits
below.
* Used to handle optional project nodes that the optimizer may insert
between
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java
index fe8af6fee86..94c60586fee 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java
@@ -36,6 +36,7 @@ public class SplitMultiDistinctTest extends TestWithFeService
implements MemoPat
connectContext.setDatabase("test");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setEnableParallelResultSink(false);
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
index 5e618797188..e8bf8b628c3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
@@ -121,6 +121,7 @@ public class QueryCacheNormalizerTest extends
TestWithFeService {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setEnableQueryCache(true);
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java
index 08eaa4631db..a7dc637a407 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java
@@ -30,6 +30,7 @@ public class StatisticDeriveTest extends TestWithFeService {
// create database
createDatabase("test");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
createTable(
"CREATE TABLE test.join1 (\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index 721caa02237..14c020cdc82 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -131,6 +131,7 @@ public class OlapQueryCacheTest {
state = new QueryState();
scheduler = new ConnectScheduler(10);
SessionVariable sessionVariable = new SessionVariable();
+ sessionVariable.parallelPipelineTaskNum = 2;
Deencapsulation.setField(sessionVariable, "beNumberForTest", 1);
MysqlSerializer serializer = MysqlSerializer.newInstance();
env = AccessTestUtil.fetchAdminCatalog();
diff --git a/regression-test/data/function_p0/test_agg_foreach.out
b/regression-test/data/function_p0/test_agg_foreach.out
index 0ae73e90d3d..53b1759e9e6 100644
--- a/regression-test/data/function_p0/test_agg_foreach.out
+++ b/regression-test/data/function_p0/test_agg_foreach.out
@@ -9,7 +9,10 @@
[1840.222222222222, 0, 0] [2760.333333333333, 0, 0] [1, 0, 0]
-- !sql --
-[3, 2, 1]
["[{"cbe":{"100":1,"1":1,"20":1},"notnull":3,"null":1,"all":4}]",
"[{"cbe":{"2":2},"notnull":2,"null":0,"all":2}]",
"[{"cbe":{"3":1},"notnull":1,"null":0,"all":1}]"] [3, 1, 1]
+[3, 2, 1] [3, 1, 1]
+
+-- !count_by_enum --
+1 1 1 3 1 4 2 2 0 2
1 1 0 1
-- !sql --
[[1, 20, 100, null], [2, 2], [3]]
@@ -17,5 +20,8 @@
-- !sql --
[["ab", "cd", "efg", null], ["123", "c"], ["114514"]]
+-- !array_agg_nested --
+3 5 2 1
+
-- !sql --
["{"num_buckets":3,"buckets":[{"lower":"1","upper":"1","ndv":1,"count":1,"pre_sum":0},{"lower":"20","upper":"20","ndv":1,"count":1,"pre_sum":1},{"lower":"100","upper":"100","ndv":1,"count":1,"pre_sum":2}]}",
"{"num_buckets":1,"buckets":[{"lower":"2","upper":"2","ndv":1,"count":2,"pre_sum":0}]}",
"{"num_buckets":1,"buckets":[{"lower":"3","upper":"3","ndv":1,"count":1,"pre_sum":0}]}"]
diff --git a/regression-test/data/function_p0/test_agg_foreach_notnull.out
b/regression-test/data/function_p0/test_agg_foreach_notnull.out
index 71683b04143..971dcd35ed9 100644
--- a/regression-test/data/function_p0/test_agg_foreach_notnull.out
+++ b/regression-test/data/function_p0/test_agg_foreach_notnull.out
@@ -9,7 +9,10 @@
[1840.222222222222, 0, 0] [2760.333333333333, 0, 0] [1, 0, 0]
-- !sql --
-[3, 2, 1]
["[{"cbe":{"100":1,"1":1,"20":1},"notnull":3,"null":1,"all":4}]",
"[{"cbe":{"2":2},"notnull":2,"null":0,"all":2}]",
"[{"cbe":{"3":1},"notnull":1,"null":0,"all":1}]"] [3, 1, 1]
+[3, 2, 1] [3, 1, 1]
+
+-- !count_by_enum --
+1 1 1 3 1 4 2 2 0 2
1 1 0 1
-- !sql --
["{"num_buckets":3,"buckets":[{"lower":"1","upper":"1","ndv":1,"count":1,"pre_sum":0},{"lower":"20","upper":"20","ndv":1,"count":1,"pre_sum":1},{"lower":"100","upper":"100","ndv":1,"count":1,"pre_sum":2}]}",
"{"num_buckets":1,"buckets":[{"lower":"2","upper":"2","ndv":1,"count":2,"pre_sum":0}]}",
"{"num_buckets":1,"buckets":[{"lower":"3","upper":"3","ndv":1,"count":1,"pre_sum":0}]}"]
diff --git a/regression-test/suites/function_p0/test_agg_foreach.groovy
b/regression-test/suites/function_p0/test_agg_foreach.groovy
index f3a7494c2fb..16fa21141c6 100644
--- a/regression-test/suites/function_p0/test_agg_foreach.groovy
+++ b/regression-test/suites/function_p0/test_agg_foreach.groovy
@@ -92,17 +92,40 @@ suite("test_agg_foreach") {
exception "errCode"
}
- qt_sql """
- select count_foreach(a) , count_by_enum_foreach(a) ,
approx_count_distinct_foreach(a) from foreach_table;
- """
+ qt_sql """
+ select count_foreach(a), approx_count_distinct_foreach(a) from
foreach_table;
+ """
+
+ qt_count_by_enum """
+ select
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."1"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."20"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."100"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].all'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].cbe."2"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].all'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].cbe."3"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].all')
+ from foreach_table;
+ """
qt_sql """select array_agg_foreach(a) from foreach_table;"""
qt_sql """select array_agg_foreach(s) from foreach_table;"""
- test {
- sql """select array_agg_foreach(b) from foreach_table;"""
- exception "not support"
- }
+ qt_array_agg_nested """
+ select /*+ SET_VAR(parallel_pipeline_task_num=1) */
+ size(array_agg_foreach(b)),
+ size(element_at(array_agg_foreach(b), 1)),
+ size(element_at(array_agg_foreach(b), 2)),
+ size(element_at(array_agg_foreach(b), 3))
+ from foreach_table;
+ """
qt_sql """
select histogram_foreach(a) from foreach_table;
diff --git a/regression-test/suites/function_p0/test_agg_foreach_notnull.groovy
b/regression-test/suites/function_p0/test_agg_foreach_notnull.groovy
index 80c7fe3f180..858205e83c8 100644
--- a/regression-test/suites/function_p0/test_agg_foreach_notnull.groovy
+++ b/regression-test/suites/function_p0/test_agg_foreach_notnull.groovy
@@ -75,7 +75,26 @@ suite("test_agg_foreach_not_null") {
"""
qt_sql """
- select count_foreach(a) , count_by_enum_foreach(a) ,
approx_count_distinct_foreach(a) from foreach_table_not_null;
+ select count_foreach(a), approx_count_distinct_foreach(a) from
foreach_table_not_null;
+ """
+
+ qt_count_by_enum """
+ select
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."1"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."20"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].cbe."100"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1), '\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 1), '\$.[0].all'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].cbe."2"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2), '\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 2), '\$.[0].all'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].cbe."3"'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3),
'\$.[0].notnull'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3), '\$.[0].null'),
+ get_json_string(element_at(count_by_enum_foreach(a), 3), '\$.[0].all')
+ from foreach_table_not_null;
"""
qt_sql """
diff --git a/regression-test/suites/nereids_function_p0/agg_function/agg.groovy
b/regression-test/suites/nereids_function_p0/agg_function/agg.groovy
index 0030b270413..6ac820aa47a 100644
--- a/regression-test/suites/nereids_function_p0/agg_function/agg.groovy
+++ b/regression-test/suites/nereids_function_p0/agg_function/agg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("nereids_agg_fn") {
+ sql "set parallel_pipeline_task_num=2"
sql 'use regression_test_nereids_function_p0'
sql 'set experimental_enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
diff --git
a/regression-test/suites/nereids_function_p0/agg_function/sum0_cte.groovy
b/regression-test/suites/nereids_function_p0/agg_function/sum0_cte.groovy
index a6f923baaf0..287260bf2ae 100644
--- a/regression-test/suites/nereids_function_p0/agg_function/sum0_cte.groovy
+++ b/regression-test/suites/nereids_function_p0/agg_function/sum0_cte.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("sum0_cte") {
+ sql "set parallel_pipeline_task_num=2"
sql 'use regression_test_nereids_function_p0'
sql "set ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
qt_sum0_cte """with tmp as (select * from fn_test)
diff --git
a/regression-test/suites/nereids_rules_p0/adjust_nullable/test_adjust_nullable.groovy
b/regression-test/suites/nereids_rules_p0/adjust_nullable/test_adjust_nullable.groovy
index ea889e3b039..7b5b642052d 100644
---
a/regression-test/suites/nereids_rules_p0/adjust_nullable/test_adjust_nullable.groovy
+++
b/regression-test/suites/nereids_rules_p0/adjust_nullable/test_adjust_nullable.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('test_adjust_nullable') {
+ sql "set parallel_pipeline_task_num=2"
// test AjustNullable not throw exception:
// 'AdjustNullable convert slot xx from not-nullable to nullable. You can
disable check by set fe_debug = false.'
// NOTICE: the pipeline need set global fe_debug = true
diff --git
a/regression-test/suites/nereids_rules_p0/adjust_nullable/test_subquery_nullable.groovy
b/regression-test/suites/nereids_rules_p0/adjust_nullable/test_subquery_nullable.groovy
index 017fa68bde6..89551a8f43b 100644
---
a/regression-test/suites/nereids_rules_p0/adjust_nullable/test_subquery_nullable.groovy
+++
b/regression-test/suites/nereids_rules_p0/adjust_nullable/test_subquery_nullable.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('test_subquery_nullable') {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_parallel_result_sink=false;'
sql 'DROP TABLE IF EXISTS test_subquery_nullable_t1 FORCE'
sql 'DROP TABLE IF EXISTS test_subquery_nullable_t2 FORCE'
diff --git
a/regression-test/suites/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.groovy
b/regression-test/suites/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.groovy
index 778547830c1..778ea5aaaa8 100644
--- a/regression-test/suites/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.groovy
+++ b/regression-test/suites/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.groovy
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
suite("agg_join_pkfk") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql """
SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject';
set runtime_filter_mode=OFF;
diff --git
a/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy
b/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy
index 90cf4d77b87..0b442a3f1df 100644
---
a/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy
+++
b/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_agg_skew_hint") {
+ sql "set parallel_pipeline_task_num=2"
sql "set runtime_filter_mode=OFF"
sql "set disable_join_reorder=true;"
sql "drop table if exists test_skew_hint"
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
index bfb38dcf31f..9ad53f9208a 100644
---
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
+++
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("distinct_agg_rewriter") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql"""
SET ignore_shape_nodes='PhysicalProject';
set runtime_filter_mode=OFF;
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy
index 8ab6eb7dfa0..aec45e0d50b 100644
---
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy
+++
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("distinct_agg_strategy_selector") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql"""
SET ignore_shape_nodes='PhysicalProject';
set runtime_filter_mode=OFF;
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/physical_agg_regulator.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/physical_agg_regulator.groovy
index 7b9f2c04f9a..4620e7c931e 100644
---
a/regression-test/suites/nereids_rules_p0/agg_strategy/physical_agg_regulator.groovy
+++
b/regression-test/suites/nereids_rules_p0/agg_strategy/physical_agg_regulator.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("physical_agg_regulator") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql"""
SET ignore_shape_nodes='PhysicalProject';
set runtime_filter_mode=OFF;
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/test_variables.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/test_variables.groovy
index 6aa3688ccb4..08fb93729c3 100644
--- a/regression-test/suites/nereids_rules_p0/agg_strategy/test_variables.groovy
+++ b/regression-test/suites/nereids_rules_p0/agg_strategy/test_variables.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("agg_strategy_variable") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET ignore_shape_nodes='PhysicalProject'"
sql "set enable_parallel_result_sink=false"
sql "set global enable_auto_analyze=false"
diff --git
a/regression-test/suites/nereids_rules_p0/constant_propagation/constant_propagation.groovy
b/regression-test/suites/nereids_rules_p0/constant_propagation/constant_propagation.groovy
index 82c3d584e7c..b6c154b4820 100644
---
a/regression-test/suites/nereids_rules_p0/constant_propagation/constant_propagation.groovy
+++
b/regression-test/suites/nereids_rules_p0/constant_propagation/constant_propagation.groovy
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
suite('constant_propagation') {
+ sql "set parallel_pipeline_task_num=2"
def explain_and_result = { tag, sql ->
"qt_${tag}_shape" "explain shape plan ${sql}"
"order_qt_${tag}_result" "${sql}"
diff --git a/regression-test/suites/nereids_rules_p0/cse/cse.groovy
b/regression-test/suites/nereids_rules_p0/cse/cse.groovy
index f059f05620c..e48be04a6e2 100644
--- a/regression-test/suites/nereids_rules_p0/cse/cse.groovy
+++ b/regression-test/suites/nereids_rules_p0/cse/cse.groovy
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
suite("cse") {
+ sql "set parallel_pipeline_task_num=2"
sql """
drop table if exists cse;
CREATE TABLE `cse` (
diff --git
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
index 883f0529132..94e6852f046 100644
---
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
+++
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
@@ -20,7 +20,7 @@ suite("distinct_split") {
sql "set disable_join_reorder=true"
sql "set global enable_auto_analyze=false;"
sql "set be_number_for_test=1;"
- sql "set parallel_pipeline_task_num=1;"
+ sql "set parallel_pipeline_task_num=2;"
sql "drop table if exists test_distinct_multi"
sql "create table test_distinct_multi(a int, b int, c int, d varchar(10),
e date) distributed by hash(a) properties('replication_num'='1');"
sql "insert into test_distinct_multi
values(1,2,3,'abc','2024-01-02'),(1,2,4,'abc','2024-01-03'),(2,2,4,'abcd','2024-01-02'),(1,2,3,'abcd','2024-01-04'),(1,2,4,'eee','2024-02-02'),(2,2,4,'abc','2024-01-02');"
diff --git
a/regression-test/suites/nereids_rules_p0/eliminate_aggregate_casewhen/eliminate_aggregate_casewhen.groovy
b/regression-test/suites/nereids_rules_p0/eliminate_aggregate_casewhen/eliminate_aggregate_casewhen.groovy
index 1560310e43d..8de498cb362 100644
---
a/regression-test/suites/nereids_rules_p0/eliminate_aggregate_casewhen/eliminate_aggregate_casewhen.groovy
+++
b/regression-test/suites/nereids_rules_p0/eliminate_aggregate_casewhen/eliminate_aggregate_casewhen.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("eliminate_aggregate_casewhen") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/eliminate_aggregate_constant/eliminate_aggregate_constant.groovy
b/regression-test/suites/nereids_rules_p0/eliminate_aggregate_constant/eliminate_aggregate_constant.groovy
index 97c101afab4..dc744f63790 100644
---
a/regression-test/suites/nereids_rules_p0/eliminate_aggregate_constant/eliminate_aggregate_constant.groovy
+++
b/regression-test/suites/nereids_rules_p0/eliminate_aggregate_constant/eliminate_aggregate_constant.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("eliminate_aggregate_constant") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/expression/test_convert_median_to_percentile.groovy
b/regression-test/suites/nereids_rules_p0/expression/test_convert_median_to_percentile.groovy
index 2d8cf3a8b59..ea26b5480bd 100644
---
a/regression-test/suites/nereids_rules_p0/expression/test_convert_median_to_percentile.groovy
+++
b/regression-test/suites/nereids_rules_p0/expression/test_convert_median_to_percentile.groovy
@@ -18,6 +18,7 @@
suite("test_convert_median_to_percentile") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
+ sql "set parallel_pipeline_task_num=2"
sql "create database if not exists test_convert_median_to_percentile"
sql "use test_convert_median_to_percentile"
diff --git
a/regression-test/suites/nereids_rules_p0/expression/test_simplify_range.groovy
b/regression-test/suites/nereids_rules_p0/expression/test_simplify_range.groovy
index e5c5aec3c28..30e4b91c6ad 100644
---
a/regression-test/suites/nereids_rules_p0/expression/test_simplify_range.groovy
+++
b/regression-test/suites/nereids_rules_p0/expression/test_simplify_range.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('test_simplify_range') {
+ sql "set parallel_pipeline_task_num=2"
def tbl_1 = 'test_simplify_range_tbl_1'
sql '''
SET ignore_shape_nodes='PhysicalDistribute';
diff --git
a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_expression_in_hash_join.groovy
b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_expression_in_hash_join.groovy
index 225578a08d6..c75d22670b4 100644
---
a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_expression_in_hash_join.groovy
+++
b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_expression_in_hash_join.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("push_down_expression_in_hash_join") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/infer_predicate/infer_intersect_except.groovy
b/regression-test/suites/nereids_rules_p0/infer_predicate/infer_intersect_except.groovy
index 07e969df0d5..9aa5ce723e5 100644
---
a/regression-test/suites/nereids_rules_p0/infer_predicate/infer_intersect_except.groovy
+++
b/regression-test/suites/nereids_rules_p0/infer_predicate/infer_intersect_except.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("infer_intersect_except") {
+ sql "set parallel_pipeline_task_num=2"
sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
sql """SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"""
diff --git
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_agg.groovy
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_agg.groovy
index 21e447ca5ca..98499262efe 100644
---
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_agg.groovy
+++
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_agg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_pull_up_agg") {
+ sql "set parallel_pipeline_task_num=2"
// sql "set disable_nereids_rules='INFER_PREDICATES'"
sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_literal.groovy
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_literal.groovy
index 36da3b86f3b..5f92859f5b5 100644
---
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_literal.groovy
+++
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_literal.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("pull_up_predicate_literal") {
+ sql "set parallel_pipeline_task_num=2"
sql """ DROP TABLE IF EXISTS test_pull_up_predicate_literal; """
sql "set enable_fallback_to_original_planner=false"
sql """SET ignore_shape_nodes='PhysicalDistribute'"""
diff --git
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_set_op.groovy
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_set_op.groovy
index f4537ddd035..50b7cf75df3 100644
---
a/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_set_op.groovy
+++
b/regression-test/suites/nereids_rules_p0/infer_predicate/pull_up_predicate_set_op.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("pull_up_predicate_set_op") {
+ sql "set parallel_pipeline_task_num=2"
sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
sql """SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"""
diff --git
a/regression-test/suites/nereids_rules_p0/max_min_filter_push_down/max_min_filter_push_down.groovy
b/regression-test/suites/nereids_rules_p0/max_min_filter_push_down/max_min_filter_push_down.groovy
index 227e69346b8..5b094c25b27 100644
---
a/regression-test/suites/nereids_rules_p0/max_min_filter_push_down/max_min_filter_push_down.groovy
+++
b/regression-test/suites/nereids_rules_p0/max_min_filter_push_down/max_min_filter_push_down.groovy
@@ -18,6 +18,7 @@ suite("max_min_filter_push_down") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
+ sql "set parallel_pipeline_task_num=2"
sql "set disable_nereids_rules='REWRITE_SIMPLE_AGG_TO_CONSTANT'"
sql "drop table if exists max_min_filter_push_down1"
diff --git
a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
index 4ca62279ee8..172e930a408 100644
---
a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
+++
b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
suite("merge_aggregate") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
diff --git
a/regression-test/suites/nereids_rules_p0/mv/agg_optimize_when_uniform/agg_optimize_when_uniform.groovy
b/regression-test/suites/nereids_rules_p0/mv/agg_optimize_when_uniform/agg_optimize_when_uniform.groovy
index 4fe8242eebb..5cc5149a3ce 100644
---
a/regression-test/suites/nereids_rules_p0/mv/agg_optimize_when_uniform/agg_optimize_when_uniform.groovy
+++
b/regression-test/suites/nereids_rules_p0/mv/agg_optimize_when_uniform/agg_optimize_when_uniform.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("agg_optimize_when_uniform") {
+ sql "set parallel_pipeline_task_num=2"
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "set pre_materialized_view_rewrite_strategy = TRY_IN_RBO"
diff --git
a/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
b/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
index e2dd89be33f..81c9517a2a0 100644
---
a/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
+++
b/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("infer_predicate") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/project_distinct_to_agg/project_distinct_to_agg.groovy
b/regression-test/suites/nereids_rules_p0/project_distinct_to_agg/project_distinct_to_agg.groovy
index c0d12ddcd2c..9334facd6a0 100644
---
a/regression-test/suites/nereids_rules_p0/project_distinct_to_agg/project_distinct_to_agg.groovy
+++
b/regression-test/suites/nereids_rules_p0/project_distinct_to_agg/project_distinct_to_agg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('project_distinct_to_agg') {
+ sql "set parallel_pipeline_task_num=2"
def tbl = 'tbl_project_distinct_to_agg'
sql "SET ignore_shape_nodes='PhysicalDistribute'"
sql "drop table if exists ${tbl} force"
diff --git
a/regression-test/suites/nereids_rules_p0/push_count_into_union_all/push_count_into_union_all.groovy
b/regression-test/suites/nereids_rules_p0/push_count_into_union_all/push_count_into_union_all.groovy
index 71e07318820..2624d4d8329 100644
---
a/regression-test/suites/nereids_rules_p0/push_count_into_union_all/push_count_into_union_all.groovy
+++
b/regression-test/suites/nereids_rules_p0/push_count_into_union_all/push_count_into_union_all.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("push_count_into_union_all") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql """SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"""
diff --git
a/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.groovy
b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.groovy
index 22a59dd1cbd..2f202020db4 100644
---
a/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.groovy
+++
b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("push_down_limit_distinct_through_join") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql """
SET enable_nereids_planner=true;
SET enable_fallback_to_original_planner=false;
diff --git
a/regression-test/suites/nereids_rules_p0/push_down_top_n/push_down_top_n_distinct_through_union.groovy
b/regression-test/suites/nereids_rules_p0/push_down_top_n/push_down_top_n_distinct_through_union.groovy
index ce158e07f36..16042fbebf0 100644
---
a/regression-test/suites/nereids_rules_p0/push_down_top_n/push_down_top_n_distinct_through_union.groovy
+++
b/regression-test/suites/nereids_rules_p0/push_down_top_n/push_down_top_n_distinct_through_union.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("push_down_top_n_distinct_through_union") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git a/regression-test/suites/nereids_rules_p0/salt_join/salt_join.groovy
b/regression-test/suites/nereids_rules_p0/salt_join/salt_join.groovy
index fbf17db586e..2a1b6b8b574 100644
--- a/regression-test/suites/nereids_rules_p0/salt_join/salt_join.groovy
+++ b/regression-test/suites/nereids_rules_p0/salt_join/salt_join.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("salt_join") {
+ sql "set parallel_pipeline_task_num=2"
// sql "set disable_nereids_rules=JOIN_SKEW_ADD_SALT"
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "set disable_nereids_rules='prune_empty_partition'"
diff --git a/regression-test/suites/nereids_rules_p0/sumRewrite.groovy
b/regression-test/suites/nereids_rules_p0/sumRewrite.groovy
index e85c6cc63f7..c9d492bb06b 100644
--- a/regression-test/suites/nereids_rules_p0/sumRewrite.groovy
+++ b/regression-test/suites/nereids_rules_p0/sumRewrite.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("sumRewrite") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
diff --git
a/regression-test/suites/nereids_rules_p0/transposeJoin/transposeSemiJoinAgg.groovy
b/regression-test/suites/nereids_rules_p0/transposeJoin/transposeSemiJoinAgg.groovy
index 18c85536049..134b355a7b2 100644
---
a/regression-test/suites/nereids_rules_p0/transposeJoin/transposeSemiJoinAgg.groovy
+++
b/regression-test/suites/nereids_rules_p0/transposeJoin/transposeSemiJoinAgg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("transposeSemiJoinAgg") {
+ sql "set parallel_pipeline_task_num=2"
// filter about invisible column "DORIS_DELETE_SIGN = 0" has no impaction
on partition pruning
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
diff --git
a/regression-test/suites/nereids_rules_p0/unique_function/add_project_for_unique_function.groovy
b/regression-test/suites/nereids_rules_p0/unique_function/add_project_for_unique_function.groovy
index 9a400900ca3..78544a29608 100644
---
a/regression-test/suites/nereids_rules_p0/unique_function/add_project_for_unique_function.groovy
+++
b/regression-test/suites/nereids_rules_p0/unique_function/add_project_for_unique_function.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('add_project_for_unique_function') {
+ sql "set parallel_pipeline_task_num=2"
sql 'SET enable_nereids_planner=true'
sql 'SET runtime_filter_mode=OFF'
sql 'SET enable_fallback_to_original_planner=false'
diff --git
a/regression-test/suites/nereids_rules_p0/unique_function/agg_with_unique_function.groovy
b/regression-test/suites/nereids_rules_p0/unique_function/agg_with_unique_function.groovy
index 630788ad863..678072e1df2 100644
---
a/regression-test/suites/nereids_rules_p0/unique_function/agg_with_unique_function.groovy
+++
b/regression-test/suites/nereids_rules_p0/unique_function/agg_with_unique_function.groovy
@@ -16,6 +16,7 @@
// under the License.
suite('agg_with_unique_function') {
+ sql "set parallel_pipeline_task_num=2"
sql 'SET enable_nereids_planner=true'
sql 'SET runtime_filter_mode=OFF'
sql 'SET enable_fallback_to_original_planner=false'
diff --git
a/regression-test/suites/nereids_rules_p0/window_skew_rewrite/window_skew_rewrite.groovy
b/regression-test/suites/nereids_rules_p0/window_skew_rewrite/window_skew_rewrite.groovy
index ba7f29fef6c..2620add2cf9 100644
---
a/regression-test/suites/nereids_rules_p0/window_skew_rewrite/window_skew_rewrite.groovy
+++
b/regression-test/suites/nereids_rules_p0/window_skew_rewrite/window_skew_rewrite.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("window_skew_rewrite") {
+ sql "set parallel_pipeline_task_num=2"
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "drop table if exists test_skew_window"
sql """create table test_skew_window(a int, c varchar(100), b int, d
varchar(20)) distributed by hash(a) buckets 32
properties("replication_num"="1");"""
diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
index 772ba119b16..ea60b749bc8 100644
--- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
+++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
@@ -18,6 +18,7 @@
suite("agg_4_phase") {
sql "SET enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
+ sql "set parallel_pipeline_task_num=4"
sql "drop table if exists agg_4_phase_tbl"
sql """
CREATE TABLE agg_4_phase_tbl (
diff --git a/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy
b/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy
index da2c78e73aa..963b508cb83 100644
--- a/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy
+++ b/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("analyze_agg") {
+ sql "set parallel_pipeline_task_num=2"
sql """DROP TABLE IF EXISTS t1"""
sql """DROP TABLE IF EXISTS t2"""
diff --git a/regression-test/suites/nereids_syntax_p0/analyze_repeat.groovy
b/regression-test/suites/nereids_syntax_p0/analyze_repeat.groovy
index 304884f8f99..68ac58b3528 100644
--- a/regression-test/suites/nereids_syntax_p0/analyze_repeat.groovy
+++ b/regression-test/suites/nereids_syntax_p0/analyze_repeat.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("analyze_repeat") {
+ sql "set parallel_pipeline_task_num=2"
sql """
SET enable_fallback_to_original_planner=false;
SET enable_nereids_planner=true;
diff --git a/regression-test/suites/nereids_syntax_p0/analyze_sort.groovy
b/regression-test/suites/nereids_syntax_p0/analyze_sort.groovy
index aa9d0ab7dd6..882a27a5e02 100644
--- a/regression-test/suites/nereids_syntax_p0/analyze_sort.groovy
+++ b/regression-test/suites/nereids_syntax_p0/analyze_sort.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("analyze_sort") {
+ sql "set parallel_pipeline_task_num=2"
sql """
SET ignore_shape_nodes='PhysicalDistribute';
diff --git a/regression-test/suites/nereids_tpch_p0/tpch/agg_cse.groovy
b/regression-test/suites/nereids_tpch_p0/tpch/agg_cse.groovy
index 389383bfc23..1e620035f11 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/agg_cse.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/agg_cse.groovy
@@ -18,6 +18,7 @@
*/
suite("agg_cse") {
+ sql "set parallel_pipeline_task_num=2"
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql 'set enable_nereids_planner=true'
diff --git
a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
index 5e694b4781d..15e903d85e9 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
@@ -18,6 +18,7 @@
*/
suite("push_topn_to_agg") {
+ sql "set parallel_pipeline_task_num=2"
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql "set topn_opt_limit_threshold=1024"
diff --git a/regression-test/suites/nereids_tpch_p0/tpch/topn-filter.groovy
b/regression-test/suites/nereids_tpch_p0/tpch/topn-filter.groovy
index 39dbf185dda..fc8bcc19e0a 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/topn-filter.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/topn-filter.groovy
@@ -18,6 +18,7 @@
*/
suite("topn-filter") {
+ sql "set parallel_pipeline_task_num=2"
String db = context.config.getDbNameByFile(new File(context.file.parent))
multi_sql """
use ${db};
diff --git
a/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
index 63e63a43a1f..75a1cd0bcc8 100644
---
a/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
+++
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
@@ -188,7 +188,8 @@ class InlineMergeErrorUDAF:
"""
test {
- sql """ SELECT py_inline_raise_udaf_merge(val) FROM
python_raise_error_test; """
+ sql """ SELECT /*+SET_VAR(parallel_pipeline_task_num=2)*/
py_inline_raise_udaf_merge(val)
+ FROM python_raise_error_test; """
exception "inline_udaf_merge_error_42"
}
diff --git a/regression-test/suites/query_p0/aggregate/agg_union_random.groovy
b/regression-test/suites/query_p0/aggregate/agg_union_random.groovy
index 1cd0a065f08..adbd8ac8246 100644
--- a/regression-test/suites/query_p0/aggregate/agg_union_random.groovy
+++ b/regression-test/suites/query_p0/aggregate/agg_union_random.groovy
@@ -18,6 +18,7 @@
*/
suite("agg_union_random") {
+ sql "set parallel_pipeline_task_num=2"
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
sql "DROP TABLE IF EXISTS test_random;"
diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy
b/regression-test/suites/query_p0/cache/query_cache.groovy
index fb15e34a4f5..ea14a085c71 100644
--- a/regression-test/suites/query_p0/cache/query_cache.groovy
+++ b/regression-test/suites/query_p0/cache/query_cache.groovy
@@ -18,6 +18,7 @@
import java.util.stream.Collectors
suite("query_cache") {
+ sql "set parallel_pipeline_task_num=2"
def tableName =
"table_3_undef_partitions2_keys3_properties4_distributed_by53"
sql "set enable_sql_cache=false"
diff --git
a/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
index e96cd0ca78a..56c05e49fb5 100644
--- a/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
+++ b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
@@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReference
// under the License.
suite("query_cache_with_context") {
+ sql "set parallel_pipeline_task_num=2"
multi_sql """
set enable_sql_cache=false;
set enable_query_cache=false;
diff --git
a/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
b/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
index ae57e4ce6a5..007f22d243d 100644
---
a/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
+++
b/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
@@ -20,6 +20,7 @@
// and modified by Doris.
suite("dist_expr_list") {
+ sql "set parallel_pipeline_task_num=2"
sql """
drop table if exists agg_cse_shuffle;
create table agg_cse_shuffle(
diff --git a/regression-test/suites/query_p0/eager_agg/eager_agg.groovy
b/regression-test/suites/query_p0/eager_agg/eager_agg.groovy
index 5d738a962d6..3afa117f4d0 100644
--- a/regression-test/suites/query_p0/eager_agg/eager_agg.groovy
+++ b/regression-test/suites/query_p0/eager_agg/eager_agg.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("eager_agg") {
+ sql "set parallel_pipeline_task_num=2"
sql """
set eager_aggregation_mode=1;
set eager_aggregation_on_join=true;
diff --git a/regression-test/suites/query_p0/hint/fix_leading.groovy
b/regression-test/suites/query_p0/hint/fix_leading.groovy
index fe2434394af..a08a5fa4186 100644
--- a/regression-test/suites/query_p0/hint/fix_leading.groovy
+++ b/regression-test/suites/query_p0/hint/fix_leading.groovy
@@ -26,7 +26,7 @@ suite("fix_leading") {
// setting planner to nereids
sql 'set exec_mem_limit=21G'
sql 'set be_number_for_test=1'
- sql "set parallel_pipeline_task_num=1"
+ sql "set parallel_pipeline_task_num=2"
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
sql 'set enable_nereids_distribute_planner=false'
sql 'set runtime_filter_mode=OFF'
diff --git a/regression-test/suites/query_p0/hint/multi_leading.groovy
b/regression-test/suites/query_p0/hint/multi_leading.groovy
index 868ff22c530..6cfec37b121 100644
--- a/regression-test/suites/query_p0/hint/multi_leading.groovy
+++ b/regression-test/suites/query_p0/hint/multi_leading.groovy
@@ -26,7 +26,7 @@ suite("multi_leading") {
// setting planner to nereids
sql 'set exec_mem_limit=21G'
sql 'set be_number_for_test=1'
- sql 'set parallel_pipeline_task_num=1'
+ sql 'set parallel_pipeline_task_num=2'
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
sql 'set enable_nereids_distribute_planner=false'
sql "set ignore_shape_nodes='PhysicalProject'"
diff --git a/regression-test/suites/query_p0/hint/test_hint.groovy
b/regression-test/suites/query_p0/hint/test_hint.groovy
index c08dfb9d404..63962c108c1 100644
--- a/regression-test/suites/query_p0/hint/test_hint.groovy
+++ b/regression-test/suites/query_p0/hint/test_hint.groovy
@@ -26,7 +26,7 @@ suite("test_hint") {
// setting planner to nereids
sql 'set exec_mem_limit=21G'
sql 'set be_number_for_test=1'
- sql 'set parallel_pipeline_task_num=1'
+ sql 'set parallel_pipeline_task_num=2'
sql "set disable_nereids_rules='PRUNE_EMPTY_PARTITION,
REWRITE_SIMPLE_AGG_TO_CONSTANT'"
sql 'set enable_nereids_distribute_planner=false'
sql "set ignore_shape_nodes='PhysicalProject'"
diff --git
a/regression-test/suites/query_p0/repeat/test_repeat_output_slot.groovy
b/regression-test/suites/query_p0/repeat/test_repeat_output_slot.groovy
index 92667ca3f30..ac9acac7b84 100644
--- a/regression-test/suites/query_p0/repeat/test_repeat_output_slot.groovy
+++ b/regression-test/suites/query_p0/repeat/test_repeat_output_slot.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_repeat_output_slot") {
+ sql "set parallel_pipeline_task_num=2"
sql """
SET ignore_shape_nodes='PhysicalDistribute';
SET disable_nereids_rules='PRUNE_EMPTY_PARTITION';
diff --git a/regression-test/suites/query_p0/runtime_filter/check_rf.groovy
b/regression-test/suites/query_p0/runtime_filter/check_rf.groovy
index 489d6ce15cf..3bf9a0ffc31 100644
--- a/regression-test/suites/query_p0/runtime_filter/check_rf.groovy
+++ b/regression-test/suites/query_p0/runtime_filter/check_rf.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("check_rf") {
+ sql "set parallel_pipeline_task_num=2"
sql """
drop table if exists t1;
set disable_join_reorder=true;
diff --git a/regression-test/suites/query_p0/set_operations/except.groovy
b/regression-test/suites/query_p0/set_operations/except.groovy
index 4bf0f393f34..4c159b1ad8e 100644
--- a/regression-test/suites/query_p0/set_operations/except.groovy
+++ b/regression-test/suites/query_p0/set_operations/except.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("except", "query") {
+ sql "set parallel_pipeline_task_num=2"
sql """
set runtime_filter_type=2;
set enable_runtime_filter_prune=false;
diff --git
a/regression-test/suites/recursive_cte/query_cache_with_rec_cte_test.groovy
b/regression-test/suites/recursive_cte/query_cache_with_rec_cte_test.groovy
index 0630329f5b0..3703d53343d 100644
--- a/regression-test/suites/recursive_cte/query_cache_with_rec_cte_test.groovy
+++ b/regression-test/suites/recursive_cte/query_cache_with_rec_cte_test.groovy
@@ -23,6 +23,7 @@ suite("query_cache_with_rec_cte_test", "rec_cte") {
sql "set enable_fallback_to_original_planner=false"
sql "set enable_sql_cache=false"
sql "set enable_query_cache=true"
+ sql "set parallel_pipeline_task_num=2"
}
def assertHasCache = { String sqlStr ->
diff --git a/regression-test/suites/shape_check/clickbench/query1.groovy
b/regression-test/suites/shape_check/clickbench/query1.groovy
index 032ff1ac59d..361712e40ec 100644
--- a/regression-test/suites/shape_check/clickbench/query1.groovy
+++ b/regression-test/suites/shape_check/clickbench/query1.groovy
@@ -18,6 +18,7 @@
*/
suite("query1") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query11.groovy
b/regression-test/suites/shape_check/clickbench/query11.groovy
index 24c2f27c2e2..d2dd2efcf1c 100644
--- a/regression-test/suites/shape_check/clickbench/query11.groovy
+++ b/regression-test/suites/shape_check/clickbench/query11.groovy
@@ -18,6 +18,7 @@
*/
suite("query11") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query12.groovy
b/regression-test/suites/shape_check/clickbench/query12.groovy
index 40fa0b9493a..8e4e395e42a 100644
--- a/regression-test/suites/shape_check/clickbench/query12.groovy
+++ b/regression-test/suites/shape_check/clickbench/query12.groovy
@@ -18,6 +18,7 @@
*/
suite("query12") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query13.groovy
b/regression-test/suites/shape_check/clickbench/query13.groovy
index c45ff0537d3..aba8e9a66b5 100644
--- a/regression-test/suites/shape_check/clickbench/query13.groovy
+++ b/regression-test/suites/shape_check/clickbench/query13.groovy
@@ -18,6 +18,7 @@
*/
suite("query13") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query14.groovy
b/regression-test/suites/shape_check/clickbench/query14.groovy
index 353fa44bbc7..c755304cbfd 100644
--- a/regression-test/suites/shape_check/clickbench/query14.groovy
+++ b/regression-test/suites/shape_check/clickbench/query14.groovy
@@ -18,6 +18,7 @@
*/
suite("query14") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query15.groovy
b/regression-test/suites/shape_check/clickbench/query15.groovy
index 7f1507a5dcc..6a1b60b1533 100644
--- a/regression-test/suites/shape_check/clickbench/query15.groovy
+++ b/regression-test/suites/shape_check/clickbench/query15.groovy
@@ -18,6 +18,7 @@
*/
suite("query15") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query2.groovy
b/regression-test/suites/shape_check/clickbench/query2.groovy
index 402b7c2140a..3f01bd9b3c9 100644
--- a/regression-test/suites/shape_check/clickbench/query2.groovy
+++ b/regression-test/suites/shape_check/clickbench/query2.groovy
@@ -18,6 +18,7 @@
*/
suite("query2") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query21.groovy
b/regression-test/suites/shape_check/clickbench/query21.groovy
index 11e6f12000e..72f6f939687 100644
--- a/regression-test/suites/shape_check/clickbench/query21.groovy
+++ b/regression-test/suites/shape_check/clickbench/query21.groovy
@@ -18,6 +18,7 @@
*/
suite("query21") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query22.groovy
b/regression-test/suites/shape_check/clickbench/query22.groovy
index 78effc59f24..8369dfd09a2 100644
--- a/regression-test/suites/shape_check/clickbench/query22.groovy
+++ b/regression-test/suites/shape_check/clickbench/query22.groovy
@@ -18,6 +18,7 @@
*/
suite("query22") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query23.groovy
b/regression-test/suites/shape_check/clickbench/query23.groovy
index 8e1aad48621..78a585b1c4d 100644
--- a/regression-test/suites/shape_check/clickbench/query23.groovy
+++ b/regression-test/suites/shape_check/clickbench/query23.groovy
@@ -18,6 +18,7 @@
*/
suite("query23") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query28.groovy
b/regression-test/suites/shape_check/clickbench/query28.groovy
index 31da9c347c2..8703e9fd696 100644
--- a/regression-test/suites/shape_check/clickbench/query28.groovy
+++ b/regression-test/suites/shape_check/clickbench/query28.groovy
@@ -18,6 +18,7 @@
*/
suite("query28") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query29.groovy
b/regression-test/suites/shape_check/clickbench/query29.groovy
index 89d289ccb77..7debefcfb75 100644
--- a/regression-test/suites/shape_check/clickbench/query29.groovy
+++ b/regression-test/suites/shape_check/clickbench/query29.groovy
@@ -18,6 +18,7 @@
*/
suite("query29") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query3.groovy
b/regression-test/suites/shape_check/clickbench/query3.groovy
index 9fe548cdf56..44e0db281b9 100644
--- a/regression-test/suites/shape_check/clickbench/query3.groovy
+++ b/regression-test/suites/shape_check/clickbench/query3.groovy
@@ -18,6 +18,7 @@
*/
suite("query3") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query30.groovy
b/regression-test/suites/shape_check/clickbench/query30.groovy
index 28577f655ac..223f41e2933 100644
--- a/regression-test/suites/shape_check/clickbench/query30.groovy
+++ b/regression-test/suites/shape_check/clickbench/query30.groovy
@@ -18,6 +18,7 @@
*/
suite("query30") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query31.groovy
b/regression-test/suites/shape_check/clickbench/query31.groovy
index 26df134b0f3..c48446fd848 100644
--- a/regression-test/suites/shape_check/clickbench/query31.groovy
+++ b/regression-test/suites/shape_check/clickbench/query31.groovy
@@ -18,6 +18,7 @@
*/
suite("query31") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query32.groovy
b/regression-test/suites/shape_check/clickbench/query32.groovy
index 19af152f4a5..33e5caa5602 100644
--- a/regression-test/suites/shape_check/clickbench/query32.groovy
+++ b/regression-test/suites/shape_check/clickbench/query32.groovy
@@ -18,6 +18,7 @@
*/
suite("query32") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query33.groovy
b/regression-test/suites/shape_check/clickbench/query33.groovy
index 1d8fbf035db..99b567c1e7f 100644
--- a/regression-test/suites/shape_check/clickbench/query33.groovy
+++ b/regression-test/suites/shape_check/clickbench/query33.groovy
@@ -18,6 +18,7 @@
*/
suite("query33") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query34.groovy
b/regression-test/suites/shape_check/clickbench/query34.groovy
index 5b00ba99c30..37c9c51eed7 100644
--- a/regression-test/suites/shape_check/clickbench/query34.groovy
+++ b/regression-test/suites/shape_check/clickbench/query34.groovy
@@ -18,6 +18,7 @@
*/
suite("query34") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query35.groovy
b/regression-test/suites/shape_check/clickbench/query35.groovy
index fc24a7bb9fa..d3cbcdd1128 100644
--- a/regression-test/suites/shape_check/clickbench/query35.groovy
+++ b/regression-test/suites/shape_check/clickbench/query35.groovy
@@ -18,6 +18,7 @@
*/
suite("query35") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query36.groovy
b/regression-test/suites/shape_check/clickbench/query36.groovy
index a0a67d15368..d5d9e2d3e89 100644
--- a/regression-test/suites/shape_check/clickbench/query36.groovy
+++ b/regression-test/suites/shape_check/clickbench/query36.groovy
@@ -18,6 +18,7 @@
*/
suite("query36") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query37.groovy
b/regression-test/suites/shape_check/clickbench/query37.groovy
index e1c70e43be7..02259bd0211 100644
--- a/regression-test/suites/shape_check/clickbench/query37.groovy
+++ b/regression-test/suites/shape_check/clickbench/query37.groovy
@@ -18,6 +18,7 @@
*/
suite("query37") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query38.groovy
b/regression-test/suites/shape_check/clickbench/query38.groovy
index 82d69a00e77..e23d9621fde 100644
--- a/regression-test/suites/shape_check/clickbench/query38.groovy
+++ b/regression-test/suites/shape_check/clickbench/query38.groovy
@@ -18,6 +18,7 @@
*/
suite("query38") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query39.groovy
b/regression-test/suites/shape_check/clickbench/query39.groovy
index ec9cf066f0e..50d05ae7e59 100644
--- a/regression-test/suites/shape_check/clickbench/query39.groovy
+++ b/regression-test/suites/shape_check/clickbench/query39.groovy
@@ -18,6 +18,7 @@
*/
suite("query39") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query4.groovy
b/regression-test/suites/shape_check/clickbench/query4.groovy
index 3d66fc01273..7b6349cea7c 100644
--- a/regression-test/suites/shape_check/clickbench/query4.groovy
+++ b/regression-test/suites/shape_check/clickbench/query4.groovy
@@ -18,6 +18,7 @@
*/
suite("query4") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query40.groovy
b/regression-test/suites/shape_check/clickbench/query40.groovy
index 347acdcb40c..654f303b868 100644
--- a/regression-test/suites/shape_check/clickbench/query40.groovy
+++ b/regression-test/suites/shape_check/clickbench/query40.groovy
@@ -18,6 +18,7 @@
*/
suite("query40") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query41.groovy
b/regression-test/suites/shape_check/clickbench/query41.groovy
index 0373520c344..89870828716 100644
--- a/regression-test/suites/shape_check/clickbench/query41.groovy
+++ b/regression-test/suites/shape_check/clickbench/query41.groovy
@@ -18,6 +18,7 @@
*/
suite("query41") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query42.groovy
b/regression-test/suites/shape_check/clickbench/query42.groovy
index cbbea9443d4..ad7588d76f3 100644
--- a/regression-test/suites/shape_check/clickbench/query42.groovy
+++ b/regression-test/suites/shape_check/clickbench/query42.groovy
@@ -18,6 +18,7 @@
*/
suite("query42") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query43.groovy
b/regression-test/suites/shape_check/clickbench/query43.groovy
index d444b8f9e88..5cdbe34c288 100644
--- a/regression-test/suites/shape_check/clickbench/query43.groovy
+++ b/regression-test/suites/shape_check/clickbench/query43.groovy
@@ -18,6 +18,7 @@
*/
suite("query43") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query6.groovy
b/regression-test/suites/shape_check/clickbench/query6.groovy
index 95198eab565..fbde45cec85 100644
--- a/regression-test/suites/shape_check/clickbench/query6.groovy
+++ b/regression-test/suites/shape_check/clickbench/query6.groovy
@@ -18,6 +18,7 @@
*/
suite("query6") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query7.groovy
b/regression-test/suites/shape_check/clickbench/query7.groovy
index 11b3400168b..ee21e652233 100644
--- a/regression-test/suites/shape_check/clickbench/query7.groovy
+++ b/regression-test/suites/shape_check/clickbench/query7.groovy
@@ -18,6 +18,7 @@
*/
suite("query7") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query8.groovy
b/regression-test/suites/shape_check/clickbench/query8.groovy
index db383b7b993..22ab2b1e4fe 100644
--- a/regression-test/suites/shape_check/clickbench/query8.groovy
+++ b/regression-test/suites/shape_check/clickbench/query8.groovy
@@ -18,6 +18,7 @@
*/
suite("query8") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/clickbench/query9.groovy
b/regression-test/suites/shape_check/clickbench/query9.groovy
index a379691c0db..a77d9a10ed6 100644
--- a/regression-test/suites/shape_check/clickbench/query9.groovy
+++ b/regression-test/suites/shape_check/clickbench/query9.groovy
@@ -18,6 +18,7 @@
*/
suite("query9") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_nereids_planner=true'
sql 'set enable_nereids_distribute_planner=false'
sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/shape_check/others/nlj.groovy
b/regression-test/suites/shape_check/others/nlj.groovy
index 63920380a73..6c0187b0223 100644
--- a/regression-test/suites/shape_check/others/nlj.groovy
+++ b/regression-test/suites/shape_check/others/nlj.groovy
@@ -18,6 +18,7 @@
*/
suite("nlj") {
+ sql "set parallel_pipeline_task_num=2"
sql 'set enable_parallel_result_sink=false;'
sql """
drop table if exists a;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]