This is an automated email from the ASF dual-hosted git repository. avamingli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push: new 75bcc59c7e9 Parallel DISTINCT plan of multi-stage. 75bcc59c7e9 is described below commit 75bcc59c7e9aea2257269ff96842480e39c92f63 Author: Zhang Mingli <avamin...@gmail.com> AuthorDate: Thu Jun 19 17:39:21 2025 +0800 Parallel DISTINCT plan of multi-stage. Postgres UPSTREAM does not support parallel DISTINCT processing since DISTINCT across multiple workers cannot be guaranteed. In MPP databases, however, we can utilize Motion to redistribute tuples across multiple workers within a parallel query. For a DISTINCT query like: select distinct a from t_distinct_0; we can create a parallel plan based on the underlying node's Parallel Scan on the table. The tuples are distributed randomly after the Parallel Scan, even when the distribution key matches the target expression. The pre-distinct node uses Streaming HashAggregate or HashAggregate to deduplicate some tuples in parallel, which are then redistributed according to the DISTINCT expressions. Finally, a second-stage process handles the DISTINCT operation. QUERY PLAN ------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) -> HashAggregate Group Key: a -> Redistribute Motion 6:6 (slice2; segments: 6) Hash Key: a Hash Module: 3 -> Streaming HashAggregate Group Key: a -> Parallel Seq Scan on t_distinct_0 Optimizer: Postgres query optimizer (10 rows) Parallel Group Aggregation is also supported: explain(costs off) select distinct a, b from t_distinct_0; QUERY PLAN ----------------------------------------------------------- GroupAggregate Group Key: a, b -> Gather Motion 6:1 (slice1; segments: 6) Merge Key: a, b -> GroupAggregate Group Key: a, b -> Sort Sort Key: a, b -> Parallel Seq Scan on t_distinct_0 Optimizer: Postgres query optimizer (10 rows) Authored-by: Zhang Mingli avamin...@gmail.com --- src/backend/cdb/cdbgroupingpaths.c | 100 +++++++++++++- src/backend/utils/misc/guc_gp.c | 11 ++ src/include/utils/guc.h | 1 + src/include/utils/unsync_guc_name.h | 1 + src/test/regress/expected/cbdb_parallel.out | 195 ++++++++++++++++++++++++++++ src/test/regress/sql/cbdb_parallel.sql | 63 +++++++++ 6 files changed, 364 insertions(+), 7 deletions(-) diff --git a/src/backend/cdb/cdbgroupingpaths.c b/src/backend/cdb/cdbgroupingpaths.c index a54c6452471..33d5024579e 100644 --- a/src/backend/cdb/cdbgroupingpaths.c +++ b/src/backend/cdb/cdbgroupingpaths.c @@ -67,6 +67,7 @@ #include "optimizer/tlist.h" #include "parser/parse_clause.h" #include "parser/parse_oper.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/selfuncs.h" @@ -142,6 +143,9 @@ typedef struct * partial_rel holds the partially aggregated results from the first stage. */ RelOptInfo *partial_rel; + + /* is a DISTINCT?*/ + bool is_distinct; } cdb_agg_planning_context; typedef struct @@ -226,6 +230,11 @@ recognize_dqa_type(cdb_agg_planning_context *ctx); static PathTarget * strip_aggdistinct(PathTarget *target); +static void add_first_stage_group_agg_partial_path(PlannerInfo *root, + Path *path, + bool is_sorted, + cdb_agg_planning_context *ctx); + /* * cdb_create_multistage_grouping_paths * @@ -300,6 +309,7 @@ cdb_create_multistage_grouping_paths(PlannerInfo *root, * across subroutines. */ memset(&ctx, 0, sizeof(ctx)); + ctx.is_distinct = false; ctx.can_sort = can_sort; ctx.can_hash = can_hash; ctx.target = target; @@ -582,6 +592,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root, memset(&zero_agg_costs, 0, sizeof(zero_agg_costs)); memset(&ctx, 0, sizeof(ctx)); + ctx.is_distinct = true; ctx.can_sort = allow_sort; ctx.can_hash = allow_hash; ctx.target = target; @@ -652,7 +663,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root, /* * All set, generate the two-stage paths. */ - create_two_stage_paths(root, &ctx, input_rel, output_rel, NIL); + create_two_stage_paths(root, &ctx, input_rel, output_rel, input_rel->partial_pathlist); } /* @@ -663,6 +674,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, RelOptInfo *input_rel, RelOptInfo *output_rel, List *partial_pathlist) { Path *cheapest_path = input_rel->cheapest_total_path; + Path *cheapest_partial_path = partial_pathlist ? (Path *) linitial(partial_pathlist) : NULL; /* * Consider ways to do the first Aggregate stage. @@ -700,6 +712,24 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, if (path == cheapest_path || is_sorted) add_first_stage_group_agg_path(root, path, is_sorted, ctx); } + + if (ctx->is_distinct && partial_pathlist) + { + foreach(lc, partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) + continue; + + is_sorted = pathkeys_contained_in(ctx->partial_needed_pathkeys, + path->pathkeys); + + if (path == cheapest_partial_path|| is_sorted) + add_first_stage_group_agg_partial_path(root, path, is_sorted, ctx); + } + } } /* @@ -721,15 +751,34 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, if (partial_pathlist) { - ListCell *lc; + ListCell *lc; - foreach(lc, partial_pathlist) + foreach (lc, partial_pathlist) { - Path *path = (Path *) lfirst(lc); + Path *path = (Path *)lfirst(lc); - if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) - continue; - add_partial_path(ctx->partial_rel, path); + if (!cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) + { + if (ctx->is_distinct && ctx->can_hash) + { + double dNumGroups = estimate_num_groups_on_segment(ctx->dNumGroupsTotal, + path->rows, + path->locus); + + path = (Path *) create_agg_path(root, + ctx->partial_rel, + path, + ctx->partial_grouping_target, + AGG_HASHED, + ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE, + parallel_query_use_streaming_hashagg, /* streaming */ + ctx->groupClause, + NIL, + ctx->agg_partial_costs, + dNumGroups); + } + add_partial_path(ctx->partial_rel, path); + } } } @@ -849,6 +898,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, path->pathkeys); else is_sorted = false; + if (path == cheapest_first_stage_path || is_sorted) { add_second_stage_group_agg_path(root, path, is_sorted, @@ -1249,6 +1299,7 @@ add_second_stage_hash_agg_path(PlannerInfo *root, /* * Calculate the number of groups in the second stage, per segment. */ + // consider parallel? if (CdbPathLocus_IsPartitioned(group_locus)) dNumGroups = clamp_row_est(ctx->dNumGroupsTotal / CdbPathLocus_NumSegments(group_locus)); @@ -2720,3 +2771,38 @@ cdb_prepare_path_for_hashed_agg(PlannerInfo *root, return subpath; } + +static void add_first_stage_group_agg_partial_path(PlannerInfo *root, + Path *path, + bool is_sorted, + cdb_agg_planning_context *ctx) +{ + + if (ctx->agg_costs->distinctAggrefs || + ctx->groupingSets) + return; + + if (!is_sorted) + { + path = (Path *) create_sort_path(root, + ctx->partial_rel, + path, + ctx->partial_sort_pathkeys, + -1.0); + } + + Assert(ctx->hasAggs || ctx->groupClause); + add_partial_path(ctx->partial_rel, + (Path *) create_agg_path(root, + ctx->partial_rel, + path, + ctx->partial_grouping_target, + ctx->groupClause ? AGG_SORTED : AGG_PLAIN, + ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE, + false, /* streaming */ + ctx->groupClause, + NIL, + ctx->agg_partial_costs, + estimate_num_groups_on_segment(ctx->dNumGroupsTotal, + path->rows, path->locus))); +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 6c3b7dae365..1f6e13bb16b 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -151,6 +151,7 @@ bool enable_parallel = false; bool enable_parallel_semi_join = true; bool enable_parallel_dedup_semi_join = true; bool enable_parallel_dedup_semi_reverse_join = true; +bool parallel_query_use_streaming_hashagg = false; int gp_appendonly_insert_files = 0; int gp_appendonly_insert_files_tuples_range = 0; int gp_random_insert_segments = 0; @@ -3227,6 +3228,16 @@ struct config_bool ConfigureNamesBool_gp[] = true, NULL, NULL, NULL }, + { + {"parallel_query_use_streaming_hashagg", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("allow to use of streaming hashagg in parallel query for DISTINCT."), + NULL, + GUC_EXPLAIN + }, + ¶llel_query_use_streaming_hashagg, + true, + NULL, NULL, NULL + }, { {"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED, gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."), diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index c0e0e293428..de0f931c797 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -293,6 +293,7 @@ extern bool enable_parallel; extern bool enable_parallel_semi_join; extern bool enable_parallel_dedup_semi_join; extern bool enable_parallel_dedup_semi_reverse_join; +extern bool parallel_query_use_streaming_hashagg; extern int gp_appendonly_insert_files; extern int gp_appendonly_insert_files_tuples_range; extern int gp_random_insert_segments; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index 2684fd7cb6e..a2a325bd9ed 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -498,6 +498,7 @@ "optimizer_use_gpdb_allocators", "optimizer_xform_bind_threshold", "parallel_leader_participation", + "parallel_query_use_streaming_hashagg", "parallel_setup_cost", "parallel_tuple_cost", "password_encryption", diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index b8639f9fb9c..6ed0b1b6bb0 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -3069,6 +3069,201 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant (4 rows) abort; +-- +-- Test Parallel DISTINCT +-- +drop table if exists t_distinct_0; +NOTICE: table "t_distinct_0" does not exist, skipping +create table t_distinct_0(a int, b int) using ao_column distributed randomly; +insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +analyze t_distinct_0; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: a + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: a + -> HashAggregate + Group Key: a + -> Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(9 rows) + +set enable_parallel = on; +-- first stage HashAgg, second stage GroupAgg +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(13 rows) + +set parallel_query_use_streaming_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(13 rows) + +-- GroupAgg +set enable_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +----------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(15 rows) + +-- HashAgg +set enable_hashagg = on; +set enable_groupagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +set parallel_query_use_streaming_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +-- multi DISTINCT tlist +explain(costs off) +select distinct a, b from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a, b + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a, b + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a, b + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +-- DISTINCT on distribution key +drop table if exists t_distinct_1; +NOTICE: table "t_distinct_1" does not exist, skipping +create table t_distinct_1(a int, b int) using ao_column; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into t_distinct_1 select * from t_distinct_0; +analyze t_distinct_1; +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_1; + QUERY PLAN +------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: a + -> Seq Scan on t_distinct_1 + Optimizer: Postgres query optimizer +(5 rows) + +set enable_parallel = on; +explain(costs off) +select distinct a from t_distinct_1; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_1 + Optimizer: Postgres query optimizer +(10 rows) + +-- +-- End of test Parallel DISTINCT +-- -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index b3fab79dd09..74a60e6ed2d 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -986,6 +986,69 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_anti.a where t2_anti.a is null; abort; +-- +-- Test Parallel DISTINCT +-- +drop table if exists t_distinct_0; +create table t_distinct_0(a int, b int) using ao_column distributed randomly; +insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +analyze t_distinct_0; +explain(costs off) +select distinct a from t_distinct_0; +set enable_parallel = on; +-- first stage HashAgg, second stage GroupAgg +explain(costs off) +select distinct a from t_distinct_0; +set parallel_query_use_streaming_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; +-- GroupAgg +set enable_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; +-- HashAgg +set enable_hashagg = on; +set enable_groupagg = off; +explain(costs off) +select distinct a from t_distinct_0; +set parallel_query_use_streaming_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0; +-- multi DISTINCT tlist +explain(costs off) +select distinct a, b from t_distinct_0; + +-- DISTINCT on distribution key +drop table if exists t_distinct_1; +create table t_distinct_1(a int, b int) using ao_column; +insert into t_distinct_1 select * from t_distinct_0; +analyze t_distinct_1; +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_1; +set enable_parallel = on; +explain(costs off) +select distinct a from t_distinct_1; + +-- +-- End of test Parallel DISTINCT +-- + -- start_ignore drop schema test_parallel cascade; -- end_ignore --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org