This is an automated email from the ASF dual-hosted git repository.

avamingli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 75bcc59c7e9 Parallel DISTINCT plan of multi-stage.
75bcc59c7e9 is described below

commit 75bcc59c7e9aea2257269ff96842480e39c92f63
Author: Zhang Mingli <avamin...@gmail.com>
AuthorDate: Thu Jun 19 17:39:21 2025 +0800

    Parallel DISTINCT plan of multi-stage.
    
    Postgres UPSTREAM does not support parallel DISTINCT processing since
    DISTINCT across multiple workers cannot be guaranteed. In MPP databases,
    however, we can utilize Motion to redistribute tuples across multiple
    workers within a parallel query.
    
    For a DISTINCT query like:
    select distinct a from t_distinct_0;
    
    we can create a parallel plan based on the underlying node's Parallel
    Scan on the table. The tuples are distributed randomly after the
    Parallel Scan, even when the distribution key matches the target
    expression.
    
    The pre-distinct node uses Streaming HashAggregate or HashAggregate to
    deduplicate some tuples in parallel, which are then redistributed
    according to the DISTINCT expressions. Finally, a second-stage process
    handles the DISTINCT operation.
    
                             QUERY PLAN
    ------------------------------------------------------------
     Gather Motion 6:1  (slice1; segments: 6)
       ->  HashAggregate
             Group Key: a
             ->  Redistribute Motion 6:6  (slice2; segments: 6)
                   Hash Key: a
                   Hash Module: 3
                   ->  Streaming HashAggregate
                         Group Key: a
                         ->  Parallel Seq Scan on t_distinct_0
     Optimizer: Postgres query optimizer
    (10 rows)
    
    Parallel Group Aggregation is also supported:
    
    explain(costs off)
    select distinct a, b from t_distinct_0;
                            QUERY PLAN
    -----------------------------------------------------------
     GroupAggregate
       Group Key: a, b
       ->  Gather Motion 6:1  (slice1; segments: 6)
             Merge Key: a, b
             ->  GroupAggregate
                   Group Key: a, b
                   ->  Sort
                         Sort Key: a, b
                         ->  Parallel Seq Scan on t_distinct_0
     Optimizer: Postgres query optimizer
    (10 rows)
    
    Authored-by: Zhang Mingli avamin...@gmail.com
---
 src/backend/cdb/cdbgroupingpaths.c          | 100 +++++++++++++-
 src/backend/utils/misc/guc_gp.c             |  11 ++
 src/include/utils/guc.h                     |   1 +
 src/include/utils/unsync_guc_name.h         |   1 +
 src/test/regress/expected/cbdb_parallel.out | 195 ++++++++++++++++++++++++++++
 src/test/regress/sql/cbdb_parallel.sql      |  63 +++++++++
 6 files changed, 364 insertions(+), 7 deletions(-)

diff --git a/src/backend/cdb/cdbgroupingpaths.c 
b/src/backend/cdb/cdbgroupingpaths.c
index a54c6452471..33d5024579e 100644
--- a/src/backend/cdb/cdbgroupingpaths.c
+++ b/src/backend/cdb/cdbgroupingpaths.c
@@ -67,6 +67,7 @@
 #include "optimizer/tlist.h"
 #include "parser/parse_clause.h"
 #include "parser/parse_oper.h"
+#include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/selfuncs.h"
 
@@ -142,6 +143,9 @@ typedef struct
         * partial_rel holds the partially aggregated results from the first 
stage.
         */
        RelOptInfo *partial_rel;
+
+       /* is a DISTINCT?*/
+       bool            is_distinct;
 } cdb_agg_planning_context;
 
 typedef struct
@@ -226,6 +230,11 @@ recognize_dqa_type(cdb_agg_planning_context *ctx);
 static PathTarget *
 strip_aggdistinct(PathTarget *target);
 
+static void add_first_stage_group_agg_partial_path(PlannerInfo *root,
+                                                                               
   Path *path,
+                                                                               
   bool is_sorted,
+                                                                               
   cdb_agg_planning_context *ctx);
+
 /*
  * cdb_create_multistage_grouping_paths
  *
@@ -300,6 +309,7 @@ cdb_create_multistage_grouping_paths(PlannerInfo *root,
         * across subroutines.
         */
        memset(&ctx, 0, sizeof(ctx));
+       ctx.is_distinct = false;
        ctx.can_sort = can_sort;
        ctx.can_hash = can_hash;
        ctx.target = target;
@@ -582,6 +592,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root,
        memset(&zero_agg_costs, 0, sizeof(zero_agg_costs));
 
        memset(&ctx, 0, sizeof(ctx));
+       ctx.is_distinct = true;
        ctx.can_sort = allow_sort;
        ctx.can_hash = allow_hash;
        ctx.target = target;
@@ -652,7 +663,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root,
        /*
         * All set, generate the two-stage paths.
         */
-       create_two_stage_paths(root, &ctx, input_rel, output_rel, NIL);
+       create_two_stage_paths(root, &ctx, input_rel, output_rel, 
input_rel->partial_pathlist);
 }
 
 /*
@@ -663,6 +674,7 @@ create_two_stage_paths(PlannerInfo *root, 
cdb_agg_planning_context *ctx,
                                           RelOptInfo *input_rel, RelOptInfo 
*output_rel, List *partial_pathlist)
 {
        Path       *cheapest_path = input_rel->cheapest_total_path;
+       Path       *cheapest_partial_path =  partial_pathlist ? (Path *) 
linitial(partial_pathlist) : NULL;
 
        /*
         * Consider ways to do the first Aggregate stage.
@@ -700,6 +712,24 @@ create_two_stage_paths(PlannerInfo *root, 
cdb_agg_planning_context *ctx,
                        if (path == cheapest_path || is_sorted)
                                add_first_stage_group_agg_path(root, path, 
is_sorted, ctx);
                }
+
+               if (ctx->is_distinct && partial_pathlist)
+               {
+                       foreach(lc, partial_pathlist)
+                       {
+                               Path       *path = (Path *) lfirst(lc);
+                               bool            is_sorted;
+
+                               if (cdbpathlocus_collocates_tlist(root, 
path->locus, ctx->group_tles))
+                                       continue;
+
+                               is_sorted = 
pathkeys_contained_in(ctx->partial_needed_pathkeys,
+                                                                               
                        path->pathkeys);
+
+                       if (path == cheapest_partial_path|| is_sorted)
+                               add_first_stage_group_agg_partial_path(root, 
path, is_sorted, ctx);
+                       }
+               }
        }
 
        /*
@@ -721,15 +751,34 @@ create_two_stage_paths(PlannerInfo *root, 
cdb_agg_planning_context *ctx,
 
        if (partial_pathlist)
        {
-               ListCell   *lc;
+               ListCell *lc;
 
-               foreach(lc, partial_pathlist)
+               foreach (lc, partial_pathlist)
                {
-                       Path       *path = (Path *) lfirst(lc);
+                       Path *path = (Path *)lfirst(lc);
 
-                       if (cdbpathlocus_collocates_tlist(root, path->locus, 
ctx->group_tles))
-                               continue;
-                       add_partial_path(ctx->partial_rel, path);
+                       if (!cdbpathlocus_collocates_tlist(root, path->locus, 
ctx->group_tles))
+                       {
+                               if (ctx->is_distinct && ctx->can_hash)
+                               {
+                                       double dNumGroups = 
estimate_num_groups_on_segment(ctx->dNumGroupsTotal,
+                                                                               
                                                                path->rows,
+                                                                               
                                                                path->locus);
+
+                                       path = (Path *) create_agg_path(root,
+                                                                               
                  ctx->partial_rel,
+                                                                               
                  path,
+                                                                               
                  ctx->partial_grouping_target,
+                                                                               
                  AGG_HASHED,
+                                                                               
                  ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE,
+                                                                               
                  parallel_query_use_streaming_hashagg, /* streaming */
+                                                                               
                  ctx->groupClause,
+                                                                               
                  NIL,
+                                                                               
                  ctx->agg_partial_costs,
+                                                                               
                  dNumGroups);
+                               }
+                               add_partial_path(ctx->partial_rel, path);
+                       }
                }
        }
 
@@ -849,6 +898,7 @@ create_two_stage_paths(PlannerInfo *root, 
cdb_agg_planning_context *ctx,
                                                                                
                          path->pathkeys);
                                else
                                        is_sorted = false;
+
                                if (path == cheapest_first_stage_path || 
is_sorted)
                                {
                                        add_second_stage_group_agg_path(root, 
path, is_sorted,
@@ -1249,6 +1299,7 @@ add_second_stage_hash_agg_path(PlannerInfo *root,
        /*
         * Calculate the number of groups in the second stage, per segment.
         */
+       // consider parallel?
        if (CdbPathLocus_IsPartitioned(group_locus))
                dNumGroups = clamp_row_est(ctx->dNumGroupsTotal /
                                                                   
CdbPathLocus_NumSegments(group_locus));
@@ -2720,3 +2771,38 @@ cdb_prepare_path_for_hashed_agg(PlannerInfo *root,
 
        return subpath;
 }
+
+static void add_first_stage_group_agg_partial_path(PlannerInfo *root,
+                                                                               
   Path *path,
+                                                                               
   bool is_sorted,
+                                                                               
   cdb_agg_planning_context *ctx)
+{
+
+       if (ctx->agg_costs->distinctAggrefs ||
+               ctx->groupingSets)
+               return;
+
+       if (!is_sorted)
+       {
+               path = (Path *) create_sort_path(root,
+                                                                               
 ctx->partial_rel,
+                                                                               
 path,
+                                                                               
 ctx->partial_sort_pathkeys,
+                                                                               
 -1.0);
+       }
+
+       Assert(ctx->hasAggs || ctx->groupClause);
+       add_partial_path(ctx->partial_rel,
+               (Path *) create_agg_path(root,
+                                                                
ctx->partial_rel,
+                                                                path,
+                                                                
ctx->partial_grouping_target,
+                                                                
ctx->groupClause ? AGG_SORTED : AGG_PLAIN,
+                                                                ctx->hasAggs ? 
AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE,
+                                                                false, /* 
streaming */
+                                                                
ctx->groupClause,
+                                                                NIL,
+                                                                
ctx->agg_partial_costs,
+                                                                
estimate_num_groups_on_segment(ctx->dNumGroupsTotal,
+                                                                               
                                                path->rows, path->locus)));
+}
diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c
index 6c3b7dae365..1f6e13bb16b 100644
--- a/src/backend/utils/misc/guc_gp.c
+++ b/src/backend/utils/misc/guc_gp.c
@@ -151,6 +151,7 @@ bool                enable_parallel = false;
 bool           enable_parallel_semi_join = true;
 bool           enable_parallel_dedup_semi_join = true;
 bool           enable_parallel_dedup_semi_reverse_join = true;
+bool           parallel_query_use_streaming_hashagg = false;
 int                    gp_appendonly_insert_files = 0;
 int                    gp_appendonly_insert_files_tuples_range = 0;
 int                    gp_random_insert_segments = 0;
@@ -3227,6 +3228,16 @@ struct config_bool ConfigureNamesBool_gp[] =
                true,
                NULL, NULL, NULL
        },
+       {
+               {"parallel_query_use_streaming_hashagg", PGC_USERSET, 
QUERY_TUNING_METHOD,
+                       gettext_noop("allow to use of streaming hashagg in 
parallel query for DISTINCT."),
+                       NULL,
+                       GUC_EXPLAIN
+               },
+               &parallel_query_use_streaming_hashagg,
+               true,
+               NULL, NULL, NULL
+       },
        {
                {"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED,
                         gettext_noop("Is in SingleNode mode (no segments). 
WARNING: user SHOULD NOT set this by any means."),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index c0e0e293428..de0f931c797 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -293,6 +293,7 @@ extern bool enable_parallel;
 extern bool enable_parallel_semi_join;
 extern bool enable_parallel_dedup_semi_join;
 extern bool enable_parallel_dedup_semi_reverse_join;
+extern bool    parallel_query_use_streaming_hashagg;
 extern int  gp_appendonly_insert_files;
 extern int  gp_appendonly_insert_files_tuples_range;
 extern int  gp_random_insert_segments;
diff --git a/src/include/utils/unsync_guc_name.h 
b/src/include/utils/unsync_guc_name.h
index 2684fd7cb6e..a2a325bd9ed 100644
--- a/src/include/utils/unsync_guc_name.h
+++ b/src/include/utils/unsync_guc_name.h
@@ -498,6 +498,7 @@
                "optimizer_use_gpdb_allocators",
                "optimizer_xform_bind_threshold",
                "parallel_leader_participation",
+               "parallel_query_use_streaming_hashagg",
                "parallel_setup_cost",
                "parallel_tuple_cost",
                "password_encryption",
diff --git a/src/test/regress/expected/cbdb_parallel.out 
b/src/test/regress/expected/cbdb_parallel.out
index b8639f9fb9c..6ed0b1b6bb0 100644
--- a/src/test/regress/expected/cbdb_parallel.out
+++ b/src/test/regress/expected/cbdb_parallel.out
@@ -3069,6 +3069,201 @@ select t1_anti.a, t1_anti.b from t1_anti left join 
t2_anti on t1_anti.a = t2_ant
 (4 rows)
 
 abort;
+--
+-- Test Parallel DISTINCT
+--
+drop table if exists t_distinct_0;
+NOTICE:  table "t_distinct_0" does not exist, skipping
+create table t_distinct_0(a int, b int) using ao_column distributed randomly;
+insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+analyze t_distinct_0;
+explain(costs off)
+select distinct a from t_distinct_0;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: a
+         ->  Redistribute Motion 3:3  (slice2; segments: 3)
+               Hash Key: a
+               ->  HashAggregate
+                     Group Key: a
+                     ->  Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+set enable_parallel = on;
+-- first stage HashAgg, second stage GroupAgg
+explain(costs off)
+select distinct a from t_distinct_0;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   Merge Key: a
+   ->  GroupAggregate
+         Group Key: a
+         ->  Sort
+               Sort Key: a
+               ->  Redistribute Motion 6:6  (slice2; segments: 6)
+                     Hash Key: a
+                     Hash Module: 3
+                     ->  Streaming HashAggregate
+                           Group Key: a
+                           ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(13 rows)
+
+set parallel_query_use_streaming_hashagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   Merge Key: a
+   ->  GroupAggregate
+         Group Key: a
+         ->  Sort
+               Sort Key: a
+               ->  Redistribute Motion 6:6  (slice2; segments: 6)
+                     Hash Key: a
+                     Hash Module: 3
+                     ->  HashAggregate
+                           Group Key: a
+                           ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(13 rows)
+
+-- GroupAgg
+set enable_hashagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+                              QUERY PLAN                               
+-----------------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   Merge Key: a
+   ->  GroupAggregate
+         Group Key: a
+         ->  Sort
+               Sort Key: a
+               ->  Redistribute Motion 6:6  (slice2; segments: 6)
+                     Hash Key: a
+                     Hash Module: 3
+                     ->  GroupAggregate
+                           Group Key: a
+                           ->  Sort
+                                 Sort Key: a
+                                 ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(15 rows)
+
+-- HashAgg
+set enable_hashagg = on;
+set enable_groupagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   ->  HashAggregate
+         Group Key: a
+         ->  Redistribute Motion 6:6  (slice2; segments: 6)
+               Hash Key: a
+               Hash Module: 3
+               ->  HashAggregate
+                     Group Key: a
+                     ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(10 rows)
+
+set parallel_query_use_streaming_hashagg = on;
+explain(costs off)
+select distinct a from t_distinct_0;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   ->  HashAggregate
+         Group Key: a
+         ->  Redistribute Motion 6:6  (slice2; segments: 6)
+               Hash Key: a
+               Hash Module: 3
+               ->  Streaming HashAggregate
+                     Group Key: a
+                     ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(10 rows)
+
+-- multi DISTINCT tlist
+explain(costs off)
+select distinct a, b from t_distinct_0;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   ->  HashAggregate
+         Group Key: a, b
+         ->  Redistribute Motion 6:6  (slice2; segments: 6)
+               Hash Key: a, b
+               Hash Module: 3
+               ->  Streaming HashAggregate
+                     Group Key: a, b
+                     ->  Parallel Seq Scan on t_distinct_0
+ Optimizer: Postgres query optimizer
+(10 rows)
+
+-- DISTINCT on distribution key 
+drop table if exists t_distinct_1;
+NOTICE:  table "t_distinct_1" does not exist, skipping
+create table t_distinct_1(a int, b int) using ao_column;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+insert into t_distinct_1 select * from t_distinct_0;
+analyze t_distinct_1;
+set enable_parallel = off;
+explain(costs off)
+select distinct a from t_distinct_1;
+                QUERY PLAN                
+------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: a
+         ->  Seq Scan on t_distinct_1
+ Optimizer: Postgres query optimizer
+(5 rows)
+
+set enable_parallel = on;
+explain(costs off)
+select distinct a from t_distinct_1;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 6:1  (slice1; segments: 6)
+   ->  HashAggregate
+         Group Key: a
+         ->  Redistribute Motion 6:6  (slice2; segments: 6)
+               Hash Key: a
+               Hash Module: 3
+               ->  Streaming HashAggregate
+                     Group Key: a
+                     ->  Parallel Seq Scan on t_distinct_1
+ Optimizer: Postgres query optimizer
+(10 rows)
+
+--
+-- End of test Parallel DISTINCT
+--
 -- start_ignore
 drop schema test_parallel cascade;
 -- end_ignore
diff --git a/src/test/regress/sql/cbdb_parallel.sql 
b/src/test/regress/sql/cbdb_parallel.sql
index b3fab79dd09..74a60e6ed2d 100644
--- a/src/test/regress/sql/cbdb_parallel.sql
+++ b/src/test/regress/sql/cbdb_parallel.sql
@@ -986,6 +986,69 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti 
on t1_anti.a = t2_ant
 select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = 
t2_anti.a where t2_anti.a is null;
 abort;
 
+--
+-- Test Parallel DISTINCT
+--
+drop table if exists t_distinct_0;
+create table t_distinct_0(a int, b int) using ao_column distributed randomly;
+insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+insert into t_distinct_0 select * from t_distinct_0;
+analyze t_distinct_0;
+explain(costs off)
+select distinct a from t_distinct_0;
+set enable_parallel = on;
+-- first stage HashAgg, second stage GroupAgg
+explain(costs off)
+select distinct a from t_distinct_0;
+set parallel_query_use_streaming_hashagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+-- GroupAgg
+set enable_hashagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+-- HashAgg
+set enable_hashagg = on;
+set enable_groupagg = off;
+explain(costs off)
+select distinct a from t_distinct_0;
+set parallel_query_use_streaming_hashagg = on;
+explain(costs off)
+select distinct a from t_distinct_0;
+-- multi DISTINCT tlist
+explain(costs off)
+select distinct a, b from t_distinct_0;
+
+-- DISTINCT on distribution key 
+drop table if exists t_distinct_1;
+create table t_distinct_1(a int, b int) using ao_column;
+insert into t_distinct_1 select * from t_distinct_0;
+analyze t_distinct_1;
+set enable_parallel = off;
+explain(costs off)
+select distinct a from t_distinct_1;
+set enable_parallel = on;
+explain(costs off)
+select distinct a from t_distinct_1;
+
+--
+-- End of test Parallel DISTINCT
+--
+
 -- start_ignore
 drop schema test_parallel cascade;
 -- end_ignore


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org

Reply via email to