This is an automated email from the ASF dual-hosted git repository.

jiaqizho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 60eb10dfdf8 ORCA: allow different strategy control the redistribute 
key below aggregate
60eb10dfdf8 is described below

commit 60eb10dfdf8f4384072269beef81d216150bc40a
Author: zhoujiaqi <zhouji...@hashdata.cn>
AuthorDate: Thu Jun 26 18:10:55 2025 +0800

    ORCA: allow different strategy control the redistribute key below aggregate
    
    In CBDB, if there is an AGG operator (one-step AGG or final AGG operator) 
that
    requires data redistribution, then the redistribution motion operator will 
used
    all `GROUP BY` keys as the redistribute keys. In fact, only a single key 
needs
    to be redistributed, and the results of AGG will be the same.
    
    Reducing the number of redistributed keys can effectively reduce the 
overhead
    of hash function calls in motion operator. However, this may lead to data 
skew.
    
    Therefore, the current commit provides several different strategies for 
deciding
    how redistribution keys should be selected during redistribution motion 
operator
    (which under the AGG operator). User can use the GUC 
`optimizer_agg_pds_strategy`
    to select the strategies.
    
    - OPTIMIZER_AGG_PDS_ALL_KEY(value: 0): default one, select all `GROUP BY` 
key as
      the redistributed keys.
    - OPTIMIZER_AGG_PDS_FIRST_KEY(value: 1): select the first `GROUP BY` key as 
the
      redistributed keys.
    - OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY(value: 2): select a `GROUP BY` key 
which has
      the minimal and positive typlen as the redistributed keys. If only 
non-fixed
      type (such as text and varchar) exist, select the first `GROUP BY` key.
    - OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED(value: 3): select the `GROUP BY` key 
which
      is fixed typlen the redistributed keys. If only non-fixed type (such as 
text
      and varchar) exist, select the first `GROUP BY` key.
---
 src/backend/gpopt/config/CConfigParamMapping.cpp   |  8 ++
 .../gporca/libgpopt/src/operators/CPhysicalAgg.cpp | 57 ++++++++++++-
 .../include/naucrates/traceflags/traceflags.h      |  9 ++
 src/backend/utils/misc/guc_gp.c                    | 12 +++
 src/include/utils/guc.h                            |  7 ++
 src/include/utils/unsync_guc_name.h                |  1 +
 src/test/regress/expected/aggregates.out           | 90 ++++++++++++++++++++
 src/test/regress/expected/aggregates_optimizer.out | 96 ++++++++++++++++++++++
 src/test/regress/sql/aggregates.sql                | 23 ++++++
 9 files changed, 301 insertions(+), 2 deletions(-)

diff --git a/src/backend/gpopt/config/CConfigParamMapping.cpp 
b/src/backend/gpopt/config/CConfigParamMapping.cpp
index ad58148b28b..2968a31829d 100644
--- a/src/backend/gpopt/config/CConfigParamMapping.cpp
+++ b/src/backend/gpopt/config/CConfigParamMapping.cpp
@@ -578,6 +578,14 @@ CConfigParamMapping::PackConfigParamInBitset(
                traceflag_bitset->ExchangeSet(EopttraceEnableWindowHashAgg);
        }
 
+       if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_FIRST_KEY) {
+               traceflag_bitset->ExchangeSet(EopttraceAggRRSFirstKey);
+       } else if (optimizer_agg_pds_strategy == 
OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY) {
+               traceflag_bitset->ExchangeSet(EopttraceAggRRSMinimalLenKey);
+       } else if (optimizer_agg_pds_strategy == 
OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED) {
+               
traceflag_bitset->ExchangeSet(EopttraceAggRRSExcludeNonFixedKey);
+       }
+
        return traceflag_bitset;
 }
 
diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp 
b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp
index 0b35e399eb2..d4e9a72649d 100644
--- a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp
+++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp
@@ -8,7 +8,6 @@
 //     @doc:
 //             Implementation of basic aggregate operator
 //---------------------------------------------------------------------------
-
 #include "gpopt/operators/CPhysicalAgg.h"
 
 #include "gpos/base.h"
@@ -295,11 +294,65 @@ CDistributionSpec *
 CPhysicalAgg::PdsMaximalHashed(CMemoryPool *mp, CColRefArray *colref_array)
 {
        GPOS_ASSERT(nullptr != colref_array);
+       CColRefArray *pcraResHashs = nullptr;
+
+       if (GPOS_FTRACE(EopttraceAggRRSFirstKey)) {
+               pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
+               if (colref_array->Size() > 0) {
+                       CColRef *colref = (*colref_array)[0];
+                       pcraResHashs->Append(colref);
+               }
+       } else if (GPOS_FTRACE(EopttraceAggRRSMinimalLenKey)) {
+               ULONG ulsz = colref_array->Size();
+               pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
+
+               LINT minimal_typlen = gpos::lint_max; // less than minimal 
typlen
+               LINT minimal_typlen_ul = -1;
+               for (ULONG ul = 0; ul < ulsz; ul++) {
+                       CColRef *pcr =(*colref_array)[ul];
+                       const gpmd::IMDType *pmdtyp = pcr->RetrieveType();
+                       LINT typlen = pmdtyp->IsFixedLength() ? 
pmdtyp->Length() : (gpos::lint_max - 1);
+
+                       if (typlen < minimal_typlen) {
+                               minimal_typlen = typlen;
+                               minimal_typlen_ul = ul;
+                       }
+               }
+
+               if (minimal_typlen_ul != -1) {
+                       
pcraResHashs->Append((*colref_array)[minimal_typlen_ul]);
+               }
+       } else if (GPOS_FTRACE(EopttraceAggRRSExcludeNonFixedKey)) {
+               ULONG ulsz = colref_array->Size();
+               pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
+
+               for (ULONG ul = 0; ul < ulsz; ul++) {
+                       CColRef *pcr =(*colref_array)[ul];
+                       const gpmd::IMDType *pmdtyp = pcr->RetrieveType();
+                       if (pmdtyp->IsFixedLength()) {
+                               pcraResHashs->Append(pcr);
+                       }
+               }
+
+               // no key in result
+               if (pcraResHashs->Size() == 0) {
+                       colref_array->AddRef();
+                       pcraResHashs = colref_array;
+               }
+       } else {
+               colref_array->AddRef();
+               pcraResHashs = colref_array;
+       }
+
+       GPOS_ASSERT(nullptr != pcraResHashs);
+       GPOS_ASSERT_IMP(colref_array->Size() > 0, pcraResHashs->Size() > 0);
 
        CDistributionSpecHashed *pdshashedMaximal =
                CDistributionSpecHashed::PdshashedMaximal(
-                       mp, colref_array, true /*fNullsColocated*/
+                       mp, pcraResHashs, true /*fNullsColocated*/
                );
+
+       pcraResHashs->Release();
        if (nullptr != pdshashedMaximal)
        {
                return pdshashedMaximal;
diff --git 
a/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h 
b/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h
index 473806f4a37..0693a374fbf 100644
--- a/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h
+++ b/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h
@@ -244,6 +244,15 @@ enum EOptTraceFlag
        // Enable window hash agg
        EopttraceEnableWindowHashAgg = 103050,
 
+       // Use the first key in AGG Pds
+       EopttraceAggRRSFirstKey = 103051,
+
+       // Use the minimal length key in AGG Pds
+       EopttraceAggRRSMinimalLenKey = 103052,
+
+       // Use the all key exclude the non-fixed key in AGG pds
+       EopttraceAggRRSExcludeNonFixedKey = 103053,
+
        ///////////////////////////////////////////////////////
        ///////////////////// statistics flags ////////////////
        //////////////////////////////////////////////////////
diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c
index 66c204c1075..c7bb596cb61 100644
--- a/src/backend/utils/misc/guc_gp.c
+++ b/src/backend/utils/misc/guc_gp.c
@@ -365,6 +365,7 @@ bool                optimizer_enable_foreign_table;
 bool           optimizer_enable_right_outer_join;
 bool           optimizer_enable_query_parameter;
 bool           optimizer_force_window_hash_agg;
+int                    optimizer_agg_pds_strategy;
 
 /* Optimizer plan enumeration related GUCs */
 bool           optimizer_enumerate_plans;
@@ -4490,6 +4491,17 @@ struct config_int ConfigureNamesInt_gp[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"optimizer_agg_pds_strategy", PGC_USERSET, DEVELOPER_OPTIONS,
+                       gettext_noop("Set the strategy of agg required 
distribution."),
+                       NULL,
+                       GUC_NOT_IN_SAMPLE
+               },
+               &optimizer_agg_pds_strategy,
+               OPTIMIZER_AGG_PDS_ALL_KEY, OPTIMIZER_AGG_PDS_ALL_KEY, 
OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED,
+               NULL, NULL, NULL
+       },
+
        {
                {"memory_profiler_dataset_size", PGC_USERSET, DEVELOPER_OPTIONS,
                        gettext_noop("Set the size in GB"),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 05ff02dd8f1..b12d29f1e16 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -466,6 +466,12 @@ extern bool create_restartpoint_on_ckpt_record_replay;
 #define OPTIMIZER_GPDB_CALIBRATED       1       /* GPDB's calibrated cost 
model */
 #define OPTIMIZER_GPDB_EXPERIMENTAL     2       /* GPDB's experimental cost 
model */
 
+/* optimizer cost model */
+#define OPTIMIZER_AGG_PDS_ALL_KEY             0
+#define OPTIMIZER_AGG_PDS_FIRST_KEY           1
+#define OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY     2
+#define OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED   3
+
 
 /* Optimizer related gucs */
 extern bool    optimizer;
@@ -546,6 +552,7 @@ extern bool optimizer_enable_foreign_table;
 extern bool optimizer_enable_right_outer_join;
 extern bool optimizer_enable_query_parameter;
 extern bool optimizer_force_window_hash_agg;
+extern int optimizer_agg_pds_strategy;
 
 /* Optimizer plan enumeration related GUCs */
 extern bool optimizer_enumerate_plans;
diff --git a/src/include/utils/unsync_guc_name.h 
b/src/include/utils/unsync_guc_name.h
index 065eca54d6e..b26c5b43c7b 100644
--- a/src/include/utils/unsync_guc_name.h
+++ b/src/include/utils/unsync_guc_name.h
@@ -440,6 +440,7 @@
                "optimizer_enable_right_outer_join",
                "optimizer_enable_query_parameter",
                "optimizer_force_window_hash_agg",
+               "optimizer_agg_pds_strategy",
                "optimizer_enforce_subplans",
                "optimizer_enumerate_plans",
                "optimizer_expand_fulljoin",
diff --git a/src/test/regress/expected/aggregates.out 
b/src/test/regress/expected/aggregates.out
index 91dba471eeb..e2105a3af40 100644
--- a/src/test/regress/expected/aggregates.out
+++ b/src/test/regress/expected/aggregates.out
@@ -3404,3 +3404,93 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
 
 reset debug_print_aggref_in_explain;
 reset optimizer_force_multistage_agg;
+-- test the optimizer_agg_pds_strategy
+DROP TABLE IF EXISTS pds_t1;
+NOTICE:  table "pds_t1" does not exist, skipping
+create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) 
DISTRIBUTED BY (v1);
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v2, v3
+         ->  Redistribute Motion 3:3  (slice2; segments: 3)
+               Hash Key: v2, v3
+               ->  HashAggregate
+                     Group Key: v2, v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v2, v3
+         ->  Redistribute Motion 3:3  (slice2; segments: 3)
+               Hash Key: v2, v3
+               ->  HashAggregate
+                     Group Key: v2, v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+set optimizer_agg_pds_strategy to 2;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v2, v3
+         ->  Redistribute Motion 3:3  (slice2; segments: 3)
+               Hash Key: v2, v3
+               ->  HashAggregate
+                     Group Key: v2, v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+set optimizer_agg_pds_strategy to 3;
+explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v2, v3, v4, v5, v6
+         ->  Redistribute Motion 3:3  (slice2; segments: 3)
+               Hash Key: v2, v3, v4, v5, v6
+               ->  HashAggregate
+                     Group Key: v2, v3, v4, v5, v6
+                     ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+-- We can't dedup the "Redistribute Motion", cause in this step we can't know 
the 
+-- distribution of output column which from the underlying operators.
+-- So you need to be cautious when opening this guc.
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+                QUERY PLAN                
+------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v3, v2, v1
+         ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(5 rows)
+
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+                QUERY PLAN                
+------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  HashAggregate
+         Group Key: v3, v2, v1
+         ->  Seq Scan on pds_t1
+ Optimizer: Postgres query optimizer
+(5 rows)
+
+reset optimizer_agg_pds_strategy;
diff --git a/src/test/regress/expected/aggregates_optimizer.out 
b/src/test/regress/expected/aggregates_optimizer.out
index a832e48e940..73fcea90841 100644
--- a/src/test/regress/expected/aggregates_optimizer.out
+++ b/src/test/regress/expected/aggregates_optimizer.out
@@ -3573,3 +3573,99 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
 
 reset debug_print_aggref_in_explain;
 reset optimizer_force_multistage_agg;
+-- test the optimizer_agg_pds_strategy
+DROP TABLE IF EXISTS pds_t1;
+NOTICE:  table "pds_t1" does not exist, skipping
+create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) 
DISTRIBUTED BY (v1);
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v2, v3
+         ->  Sort
+               Sort Key: v2, v3
+               ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                     Hash Key: v2, v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(9 rows)
+
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v2, v3
+         ->  Sort
+               Sort Key: v2, v3
+               ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                     Hash Key: v2
+                     ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(9 rows)
+
+set optimizer_agg_pds_strategy to 2;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v2, v3
+         ->  Sort
+               Sort Key: v2, v3
+               ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                     Hash Key: v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(9 rows)
+
+set optimizer_agg_pds_strategy to 3;
+explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v2, v3, v4, v5, v6
+         ->  Sort
+               Sort Key: v2, v3, v4, v5, v6
+               ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                     Hash Key: v3, v5
+                     ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(9 rows)
+
+-- We can't dedup the "Redistribute Motion", cause in this step we can't know 
the 
+-- distribution of output column which from the underlying operators.
+-- So you need to be cautious when opening this guc.
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v3, v2, v1
+         ->  Sort
+               Sort Key: v3, v2, v1
+               ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                     Hash Key: v3
+                     ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(9 rows)
+
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+                QUERY PLAN                
+------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)
+   ->  GroupAggregate
+         Group Key: v3, v2, v1
+         ->  Sort
+               Sort Key: v3, v2, v1
+               ->  Seq Scan on pds_t1
+ Optimizer: GPORCA
+(7 rows)
+
+reset optimizer_agg_pds_strategy;
diff --git a/src/test/regress/sql/aggregates.sql 
b/src/test/regress/sql/aggregates.sql
index 600ff0282e0..a96b976fdcd 100644
--- a/src/test/regress/sql/aggregates.sql
+++ b/src/test/regress/sql/aggregates.sql
@@ -1528,3 +1528,26 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
 
 reset debug_print_aggref_in_explain;
 reset optimizer_force_multistage_agg;
+
+-- test the optimizer_agg_pds_strategy
+DROP TABLE IF EXISTS pds_t1;
+create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) 
DISTRIBUTED BY (v1);
+
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+set optimizer_agg_pds_strategy to 2;
+explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
+set optimizer_agg_pds_strategy to 3;
+explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
+
+-- We can't dedup the "Redistribute Motion", cause in this step we can't know 
the 
+-- distribution of output column which from the underlying operators.
+-- So you need to be cautious when opening this guc.
+set optimizer_agg_pds_strategy to 1;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+set optimizer_agg_pds_strategy to 0;
+explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
+
+reset optimizer_agg_pds_strategy;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org

Reply via email to