This is an automated email from the ASF dual-hosted git repository. jiaqizho pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push: new 60eb10dfdf8 ORCA: allow different strategy control the redistribute key below aggregate 60eb10dfdf8 is described below commit 60eb10dfdf8f4384072269beef81d216150bc40a Author: zhoujiaqi <zhouji...@hashdata.cn> AuthorDate: Thu Jun 26 18:10:55 2025 +0800 ORCA: allow different strategy control the redistribute key below aggregate In CBDB, if there is an AGG operator (one-step AGG or final AGG operator) that requires data redistribution, then the redistribution motion operator will used all `GROUP BY` keys as the redistribute keys. In fact, only a single key needs to be redistributed, and the results of AGG will be the same. Reducing the number of redistributed keys can effectively reduce the overhead of hash function calls in motion operator. However, this may lead to data skew. Therefore, the current commit provides several different strategies for deciding how redistribution keys should be selected during redistribution motion operator (which under the AGG operator). User can use the GUC `optimizer_agg_pds_strategy` to select the strategies. - OPTIMIZER_AGG_PDS_ALL_KEY(value: 0): default one, select all `GROUP BY` key as the redistributed keys. - OPTIMIZER_AGG_PDS_FIRST_KEY(value: 1): select the first `GROUP BY` key as the redistributed keys. - OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY(value: 2): select a `GROUP BY` key which has the minimal and positive typlen as the redistributed keys. If only non-fixed type (such as text and varchar) exist, select the first `GROUP BY` key. - OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED(value: 3): select the `GROUP BY` key which is fixed typlen the redistributed keys. If only non-fixed type (such as text and varchar) exist, select the first `GROUP BY` key. --- src/backend/gpopt/config/CConfigParamMapping.cpp | 8 ++ .../gporca/libgpopt/src/operators/CPhysicalAgg.cpp | 57 ++++++++++++- .../include/naucrates/traceflags/traceflags.h | 9 ++ src/backend/utils/misc/guc_gp.c | 12 +++ src/include/utils/guc.h | 7 ++ src/include/utils/unsync_guc_name.h | 1 + src/test/regress/expected/aggregates.out | 90 ++++++++++++++++++++ src/test/regress/expected/aggregates_optimizer.out | 96 ++++++++++++++++++++++ src/test/regress/sql/aggregates.sql | 23 ++++++ 9 files changed, 301 insertions(+), 2 deletions(-) diff --git a/src/backend/gpopt/config/CConfigParamMapping.cpp b/src/backend/gpopt/config/CConfigParamMapping.cpp index ad58148b28b..2968a31829d 100644 --- a/src/backend/gpopt/config/CConfigParamMapping.cpp +++ b/src/backend/gpopt/config/CConfigParamMapping.cpp @@ -578,6 +578,14 @@ CConfigParamMapping::PackConfigParamInBitset( traceflag_bitset->ExchangeSet(EopttraceEnableWindowHashAgg); } + if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_FIRST_KEY) { + traceflag_bitset->ExchangeSet(EopttraceAggRRSFirstKey); + } else if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY) { + traceflag_bitset->ExchangeSet(EopttraceAggRRSMinimalLenKey); + } else if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED) { + traceflag_bitset->ExchangeSet(EopttraceAggRRSExcludeNonFixedKey); + } + return traceflag_bitset; } diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp index 0b35e399eb2..d4e9a72649d 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp @@ -8,7 +8,6 @@ // @doc: // Implementation of basic aggregate operator //--------------------------------------------------------------------------- - #include "gpopt/operators/CPhysicalAgg.h" #include "gpos/base.h" @@ -295,11 +294,65 @@ CDistributionSpec * CPhysicalAgg::PdsMaximalHashed(CMemoryPool *mp, CColRefArray *colref_array) { GPOS_ASSERT(nullptr != colref_array); + CColRefArray *pcraResHashs = nullptr; + + if (GPOS_FTRACE(EopttraceAggRRSFirstKey)) { + pcraResHashs = GPOS_NEW(mp) CColRefArray(mp); + if (colref_array->Size() > 0) { + CColRef *colref = (*colref_array)[0]; + pcraResHashs->Append(colref); + } + } else if (GPOS_FTRACE(EopttraceAggRRSMinimalLenKey)) { + ULONG ulsz = colref_array->Size(); + pcraResHashs = GPOS_NEW(mp) CColRefArray(mp); + + LINT minimal_typlen = gpos::lint_max; // less than minimal typlen + LINT minimal_typlen_ul = -1; + for (ULONG ul = 0; ul < ulsz; ul++) { + CColRef *pcr =(*colref_array)[ul]; + const gpmd::IMDType *pmdtyp = pcr->RetrieveType(); + LINT typlen = pmdtyp->IsFixedLength() ? pmdtyp->Length() : (gpos::lint_max - 1); + + if (typlen < minimal_typlen) { + minimal_typlen = typlen; + minimal_typlen_ul = ul; + } + } + + if (minimal_typlen_ul != -1) { + pcraResHashs->Append((*colref_array)[minimal_typlen_ul]); + } + } else if (GPOS_FTRACE(EopttraceAggRRSExcludeNonFixedKey)) { + ULONG ulsz = colref_array->Size(); + pcraResHashs = GPOS_NEW(mp) CColRefArray(mp); + + for (ULONG ul = 0; ul < ulsz; ul++) { + CColRef *pcr =(*colref_array)[ul]; + const gpmd::IMDType *pmdtyp = pcr->RetrieveType(); + if (pmdtyp->IsFixedLength()) { + pcraResHashs->Append(pcr); + } + } + + // no key in result + if (pcraResHashs->Size() == 0) { + colref_array->AddRef(); + pcraResHashs = colref_array; + } + } else { + colref_array->AddRef(); + pcraResHashs = colref_array; + } + + GPOS_ASSERT(nullptr != pcraResHashs); + GPOS_ASSERT_IMP(colref_array->Size() > 0, pcraResHashs->Size() > 0); CDistributionSpecHashed *pdshashedMaximal = CDistributionSpecHashed::PdshashedMaximal( - mp, colref_array, true /*fNullsColocated*/ + mp, pcraResHashs, true /*fNullsColocated*/ ); + + pcraResHashs->Release(); if (nullptr != pdshashedMaximal) { return pdshashedMaximal; diff --git a/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h b/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h index 473806f4a37..0693a374fbf 100644 --- a/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h +++ b/src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h @@ -244,6 +244,15 @@ enum EOptTraceFlag // Enable window hash agg EopttraceEnableWindowHashAgg = 103050, + // Use the first key in AGG Pds + EopttraceAggRRSFirstKey = 103051, + + // Use the minimal length key in AGG Pds + EopttraceAggRRSMinimalLenKey = 103052, + + // Use the all key exclude the non-fixed key in AGG pds + EopttraceAggRRSExcludeNonFixedKey = 103053, + /////////////////////////////////////////////////////// ///////////////////// statistics flags //////////////// ////////////////////////////////////////////////////// diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 66c204c1075..c7bb596cb61 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -365,6 +365,7 @@ bool optimizer_enable_foreign_table; bool optimizer_enable_right_outer_join; bool optimizer_enable_query_parameter; bool optimizer_force_window_hash_agg; +int optimizer_agg_pds_strategy; /* Optimizer plan enumeration related GUCs */ bool optimizer_enumerate_plans; @@ -4490,6 +4491,17 @@ struct config_int ConfigureNamesInt_gp[] = NULL, NULL, NULL }, + { + {"optimizer_agg_pds_strategy", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Set the strategy of agg required distribution."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &optimizer_agg_pds_strategy, + OPTIMIZER_AGG_PDS_ALL_KEY, OPTIMIZER_AGG_PDS_ALL_KEY, OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED, + NULL, NULL, NULL + }, + { {"memory_profiler_dataset_size", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Set the size in GB"), diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 05ff02dd8f1..b12d29f1e16 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -466,6 +466,12 @@ extern bool create_restartpoint_on_ckpt_record_replay; #define OPTIMIZER_GPDB_CALIBRATED 1 /* GPDB's calibrated cost model */ #define OPTIMIZER_GPDB_EXPERIMENTAL 2 /* GPDB's experimental cost model */ +/* optimizer cost model */ +#define OPTIMIZER_AGG_PDS_ALL_KEY 0 +#define OPTIMIZER_AGG_PDS_FIRST_KEY 1 +#define OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY 2 +#define OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED 3 + /* Optimizer related gucs */ extern bool optimizer; @@ -546,6 +552,7 @@ extern bool optimizer_enable_foreign_table; extern bool optimizer_enable_right_outer_join; extern bool optimizer_enable_query_parameter; extern bool optimizer_force_window_hash_agg; +extern int optimizer_agg_pds_strategy; /* Optimizer plan enumeration related GUCs */ extern bool optimizer_enumerate_plans; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index 065eca54d6e..b26c5b43c7b 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -440,6 +440,7 @@ "optimizer_enable_right_outer_join", "optimizer_enable_query_parameter", "optimizer_force_window_hash_agg", + "optimizer_agg_pds_strategy", "optimizer_enforce_subplans", "optimizer_enumerate_plans", "optimizer_expand_fulljoin", diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index 91dba471eeb..e2105a3af40 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -3404,3 +3404,93 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and reset debug_print_aggref_in_explain; reset optimizer_force_multistage_agg; +-- test the optimizer_agg_pds_strategy +DROP TABLE IF EXISTS pds_t1; +NOTICE: table "pds_t1" does not exist, skipping +create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1); +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2, v3 + -> HashAggregate + Group Key: v2, v3 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(9 rows) + +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2, v3 + -> HashAggregate + Group Key: v2, v3 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(9 rows) + +set optimizer_agg_pds_strategy to 2; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2, v3 + -> HashAggregate + Group Key: v2, v3 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(9 rows) + +set optimizer_agg_pds_strategy to 3; +explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v2, v3, v4, v5, v6 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2, v3, v4, v5, v6 + -> HashAggregate + Group Key: v2, v3, v4, v5, v6 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(9 rows) + +-- We can't dedup the "Redistribute Motion", cause in this step we can't know the +-- distribution of output column which from the underlying operators. +-- So you need to be cautious when opening this guc. +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; + QUERY PLAN +------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v3, v2, v1 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(5 rows) + +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; + QUERY PLAN +------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: v3, v2, v1 + -> Seq Scan on pds_t1 + Optimizer: Postgres query optimizer +(5 rows) + +reset optimizer_agg_pds_strategy; diff --git a/src/test/regress/expected/aggregates_optimizer.out b/src/test/regress/expected/aggregates_optimizer.out index a832e48e940..73fcea90841 100644 --- a/src/test/regress/expected/aggregates_optimizer.out +++ b/src/test/regress/expected/aggregates_optimizer.out @@ -3573,3 +3573,99 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and reset debug_print_aggref_in_explain; reset optimizer_force_multistage_agg; +-- test the optimizer_agg_pds_strategy +DROP TABLE IF EXISTS pds_t1; +NOTICE: table "pds_t1" does not exist, skipping +create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1); +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v2, v3 + -> Sort + Sort Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2, v3 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(9 rows) + +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v2, v3 + -> Sort + Sort Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v2 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(9 rows) + +set optimizer_agg_pds_strategy to 2; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v2, v3 + -> Sort + Sort Key: v2, v3 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v3 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(9 rows) + +set optimizer_agg_pds_strategy to 3; +explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v2, v3, v4, v5, v6 + -> Sort + Sort Key: v2, v3, v4, v5, v6 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v3, v5 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(9 rows) + +-- We can't dedup the "Redistribute Motion", cause in this step we can't know the +-- distribution of output column which from the underlying operators. +-- So you need to be cautious when opening this guc. +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v3, v2, v1 + -> Sort + Sort Key: v3, v2, v1 + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: v3 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(9 rows) + +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; + QUERY PLAN +------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> GroupAggregate + Group Key: v3, v2, v1 + -> Sort + Sort Key: v3, v2, v1 + -> Seq Scan on pds_t1 + Optimizer: GPORCA +(7 rows) + +reset optimizer_agg_pds_strategy; diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index 600ff0282e0..a96b976fdcd 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -1528,3 +1528,26 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and reset debug_print_aggref_in_explain; reset optimizer_force_multistage_agg; + +-- test the optimizer_agg_pds_strategy +DROP TABLE IF EXISTS pds_t1; +create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1); + +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; +set optimizer_agg_pds_strategy to 2; +explain (costs off) select v2,v3 from pds_t1 group by v2,v3; +set optimizer_agg_pds_strategy to 3; +explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6; + +-- We can't dedup the "Redistribute Motion", cause in this step we can't know the +-- distribution of output column which from the underlying operators. +-- So you need to be cautious when opening this guc. +set optimizer_agg_pds_strategy to 1; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; +set optimizer_agg_pds_strategy to 0; +explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1; + +reset optimizer_agg_pds_strategy; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org