This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 07de611941d3778dfcf37aed9a9f6ce3af72d2c8
Author: David Kimura <[email protected]>
AuthorDate: Wed Jan 18 00:27:04 2023 +0000

    [ORCA] Use catalog to determine replication safe functions
    
    Previously ORCA hardcoded a list of replication safe functions. That
    info is now stored in pg_aggregate catalog table, so use that. One of
    the advantages of this approach is that new replication safe functions
    (even user defined functions) can be added and optimized without
    requiring code changes in ORCA.
---
 src/backend/gpopt/gpdbwrappers.cpp                 | 12 +++++++
 .../gpopt/translate/CTranslatorRelcacheToDXL.cpp   |  3 +-
 .../data/dxl/minidump/ReplicatedTableGroupBy.mdp   |  2 +-
 .../minidump/ReplicatedTableWithAggNoMotion.mdp    |  2 +-
 .../gporca/libgpopt/include/gpopt/base/CUtils.h    |  2 +-
 .../include/gpopt/operators/CScalarAggFunc.h       | 13 ++++++-
 src/backend/gporca/libgpopt/src/base/CUtils.cpp    | 18 +++++-----
 .../src/operators/COrderedAggPreprocessor.cpp      |  2 +-
 .../libgpopt/src/operators/CScalarAggFunc.cpp      |  5 +--
 .../libgpopt/src/operators/CScalarProjectList.cpp  |  8 +----
 .../src/translate/CTranslatorDXLToExpr.cpp         |  2 +-
 .../gporca/libgpopt/src/xforms/CXformEagerAgg.cpp  |  4 +--
 .../gporca/libgpopt/src/xforms/CXformSplitDQA.cpp  | 12 ++++---
 .../libgpopt/src/xforms/CXformSplitGbAgg.cpp       |  6 ++--
 .../gporca/libgpopt/src/xforms/CXformUtils.cpp     | 15 ++++----
 .../naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h  |  3 ++
 .../include/naucrates/dxl/xml/dxltokens.h          |  1 +
 .../include/naucrates/md/CMDAggregateGPDB.h        | 13 ++++++-
 .../include/naucrates/md/IMDAggregate.h            |  3 ++
 .../libnaucrates/src/md/CMDAggregateGPDB.cpp       | 11 ++++--
 .../src/parser/CParseHandlerMDGPDBAgg.cpp          | 15 ++++++--
 .../gporca/libnaucrates/src/xml/dxltokens.cpp      |  1 +
 src/backend/utils/cache/lsyscache.c                | 25 +++++++++++++
 src/include/gpopt/gpdbwrappers.h                   |  2 ++
 src/include/utils/lsyscache.h                      |  1 +
 src/test/regress/expected/gp_aggregates_costs.out  | 41 ++++++++++++++++++++++
 .../expected/gp_aggregates_costs_optimizer.out     | 39 ++++++++++++++++++++
 src/test/regress/sql/gp_aggregates_costs.sql       | 21 +++++++++++
 28 files changed, 237 insertions(+), 45 deletions(-)

diff --git a/src/backend/gpopt/gpdbwrappers.cpp 
b/src/backend/gpopt/gpdbwrappers.cpp
index 61c5f73b33..674becd1dc 100644
--- a/src/backend/gpopt/gpdbwrappers.cpp
+++ b/src/backend/gpopt/gpdbwrappers.cpp
@@ -689,6 +689,18 @@ gpdb::IsOrderedAgg(Oid aggid)
        return false;
 }
 
+bool
+gpdb::IsRepSafeAgg(Oid aggid)
+{
+       GP_WRAP_START;
+       {
+               /* catalog tables: pg_aggregate */
+               return is_agg_repsafe(aggid);
+       }
+       GP_WRAP_END;
+       return false;
+}
+
 bool
 gpdb::IsAggPartialCapable(Oid aggid)
 {
diff --git a/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp 
b/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp
index 4eecea5cea..9c47c2d78b 100644
--- a/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp
+++ b/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp
@@ -1521,6 +1521,7 @@ CTranslatorRelcacheToDXL::RetrieveAgg(CMemoryPool *mp, 
IMDId *mdid)
        mdid->AddRef();
 
        BOOL is_ordered = gpdb::IsOrderedAgg(agg_oid);
+       BOOL is_repsafe = gpdb::IsRepSafeAgg(agg_oid);
 
        // GPDB does not support splitting of ordered aggs and aggs without a
        // combine function
@@ -1533,7 +1534,7 @@ CTranslatorRelcacheToDXL::RetrieveAgg(CMemoryPool *mp, 
IMDId *mdid)
 
        CMDAggregateGPDB *pmdagg = GPOS_NEW(mp) CMDAggregateGPDB(
                mp, mdid, mdname, result_type_mdid, 
intermediate_result_type_mdid,
-               is_ordered, is_splittable, is_hash_agg_capable);
+               is_ordered, is_splittable, is_hash_agg_capable, is_repsafe);
        return pmdagg;
 }
 
diff --git a/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp 
b/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp
index 85db8ace76..291645ce63 100644
--- a/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp
+++ b/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp
@@ -956,7 +956,7 @@
           <dxl:UpperBound Closed="true" TypeMdid="0.23.1.0" Value="100"/>
         </dxl:StatsBucket>
       </dxl:ColumnStatistics>
-      <dxl:GPDBAgg Mdid="0.2147.1.0" Name="count" IsSplittable="true" 
HashAggCapable="true">
+      <dxl:GPDBAgg Mdid="0.2147.1.0" Name="count" IsRepSafe="true" 
IsSplittable="true" HashAggCapable="true">
         <dxl:ResultType Mdid="0.20.1.0"/>
         <dxl:IntermediateResultType Mdid="0.20.1.0"/>
       </dxl:GPDBAgg>
diff --git 
a/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp 
b/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp
index 6f5eac6f18..ffb2e3de9f 100644
--- a/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp
+++ b/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp
@@ -214,7 +214,7 @@
         <dxl:CountAgg Mdid="0.2147.1.0"/>
       </dxl:Type>
       <dxl:ColumnStatistics Mdid="1.222467.1.0.0" Name="c" Width="4.000000" 
NullFreq="0.000000" NdvRemain="0.000000" FreqRemain="0.000000" 
ColStatsMissing="true"/>
-      <dxl:GPDBAgg Mdid="0.2108.1.0" Name="sum" IsSplittable="true" 
HashAggCapable="true">
+      <dxl:GPDBAgg Mdid="0.2108.1.0" Name="sum" IsRepSafe="true" 
IsSplittable="true" HashAggCapable="true">
         <dxl:ResultType Mdid="0.20.1.0"/>
         <dxl:IntermediateResultType Mdid="0.20.1.0"/>
       </dxl:GPDBAgg>
diff --git a/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h 
b/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h
index d29b059b71..2028e720d4 100644
--- a/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h
+++ b/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h
@@ -244,7 +244,7 @@ public:
                BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit,
                IMDId *
                        pmdidResolvedReturnType,  // return type to be used if 
original return type is ambiguous
-               EAggfuncKind aggkind, ULongPtrArray *argtypes);
+               EAggfuncKind aggkind, ULongPtrArray *argtypes, BOOL fRepSafe);
 
        // generate an aggregate function
        static CExpression *PexprAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc,
diff --git 
a/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h 
b/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h
index f06512d2a0..5147b72614 100644
--- a/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h
+++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h
@@ -86,6 +86,9 @@ private:
        // is result of splitting aggregates
        BOOL m_fSplit;
 
+       // is aggregate replicate slice execution safe
+       BOOL m_fRepSafe;
+
 public:
        CScalarAggFunc(const CScalarAggFunc &) = delete;
 
@@ -93,7 +96,8 @@ public:
        CScalarAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc,
                                   IMDId *resolved_rettype, const CWStringConst 
*pstrAggFunc,
                                   BOOL is_distinct, EAggfuncStage 
eaggfuncstage, BOOL fSplit,
-                                  EAggfuncKind aggkind, ULongPtrArray 
*argtypes);
+                                  EAggfuncKind aggkind, ULongPtrArray 
*argtypes,
+                                  BOOL fRepSafe);
 
        // dtor
        ~CScalarAggFunc() override
@@ -207,6 +211,13 @@ public:
                return m_fSplit;
        }
 
+       // is aggregate replicate slice execution safe
+       BOOL
+       FRepSafe() const
+       {
+               return m_fRepSafe;
+       }
+
        // type of expression's result
        IMDId *
        MdidType() const override
diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp 
b/src/backend/gporca/libgpopt/src/base/CUtils.cpp
index 6b64c35307..481a81364d 100644
--- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp
+++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp
@@ -1821,16 +1821,16 @@ CUtils::PopAggFunc(
        BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit,
        IMDId *
                pmdidResolvedReturnType,  // return type to be used if original 
return type is ambiguous
-       EAggfuncKind aggkind, ULongPtrArray *argtypes)
+       EAggfuncKind aggkind, ULongPtrArray *argtypes, BOOL fRepSafe)
 {
        GPOS_ASSERT(nullptr != pmdidAggFunc);
        GPOS_ASSERT(nullptr != pstrAggFunc);
        GPOS_ASSERT_IMP(nullptr != pmdidResolvedReturnType,
                                        pmdidResolvedReturnType->IsValid());
 
-       return GPOS_NEW(mp)
-               CScalarAggFunc(mp, pmdidAggFunc, pmdidResolvedReturnType, 
pstrAggFunc,
-                                          is_distinct, eaggfuncstage, fSplit, 
aggkind, argtypes);
+       return GPOS_NEW(mp) CScalarAggFunc(
+               mp, pmdidAggFunc, pmdidResolvedReturnType, pstrAggFunc, 
is_distinct,
+               eaggfuncstage, fSplit, aggkind, argtypes, fRepSafe);
 }
 
 // generate an aggregate function
@@ -1850,7 +1850,7 @@ CUtils::PexprAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc,
        // generate aggregate function
        CScalarAggFunc *popScAggFunc =
                PopAggFunc(mp, pmdidAggFunc, pstrAggFunc, is_distinct, 
eaggfuncstage,
-                                  fSplit, nullptr, EaggfunckindNormal, 
argtypes);
+                                  fSplit, nullptr, EaggfunckindNormal, 
argtypes, false);
 
        // generate function arguments
        CExpressionArray *pdrgpexpr = GPOS_NEW(mp) CExpressionArray(mp);
@@ -1907,10 +1907,10 @@ CUtils::PexprCountStar(CMemoryPool *mp)
                                                  CExpression(mp, GPOS_NEW(mp) 
CScalarValuesList(mp),
                                                                          
GPOS_NEW(mp) CExpressionArray(mp)));
 
-       CScalarAggFunc *popScAggFunc =
-               PopAggFunc(mp, mdid, str, false /*is_distinct*/,
-                                  EaggfuncstageGlobal /*eaggfuncstage*/, false 
/*fSplit*/,
-                                  nullptr, EaggfunckindNormal, GPOS_NEW(mp) 
ULongPtrArray(mp));
+       CScalarAggFunc *popScAggFunc = PopAggFunc(
+               mp, mdid, str, false /*is_distinct*/,
+               EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, 
nullptr,
+               EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false);
 
        CExpression *pexprCountStar =
                GPOS_NEW(mp) CExpression(mp, popScAggFunc, pdrgpexpr);
diff --git 
a/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp 
b/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp
index 563310c1de..7109f21bae 100644
--- a/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp
+++ b/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp
@@ -569,7 +569,7 @@ COrderedAggPreprocessor::PexprFinalAgg(CMemoryPool *mp,
        CScalarAggFunc *popNewAggFunc = CUtils::PopAggFunc(
                mp, mdid, arg_col_ref->Name().Pstr(), false /*is_distinct*/,
                EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, 
ret_type,
-               EaggfunckindNormal, argtypes);
+               EaggfunckindNormal, argtypes, popScAggFunc->FRepSafe());
 
        return GPOS_NEW(mp) CExpression(mp, popNewAggFunc, pdrgpexpr);
 }
diff --git a/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp 
b/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp
index 2865bf3d1a..8f7bca3933 100644
--- a/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp
+++ b/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp
@@ -37,7 +37,7 @@ CScalarAggFunc::CScalarAggFunc(CMemoryPool *mp, IMDId 
*pmdidAggFunc,
                                                           const CWStringConst 
*pstrAggFunc,
                                                           BOOL is_distinct, 
EAggfuncStage eaggfuncstage,
                                                           BOOL fSplit, 
EAggfuncKind aggkind,
-                                                          ULongPtrArray 
*argtypes)
+                                                          ULongPtrArray 
*argtypes, BOOL fRepSafe)
        : CScalar(mp),
          m_pmdidAggFunc(pmdidAggFunc),
          m_pmdidResolvedRetType(resolved_rettype),
@@ -47,7 +47,8 @@ CScalarAggFunc::CScalarAggFunc(CMemoryPool *mp, IMDId 
*pmdidAggFunc,
          m_aggkind(aggkind),
          m_argtypes(argtypes),
          m_eaggfuncstage(eaggfuncstage),
-         m_fSplit(fSplit)
+         m_fSplit(fSplit),
+         m_fRepSafe(fRepSafe)
 {
        GPOS_ASSERT(nullptr != pmdidAggFunc);
        GPOS_ASSERT(nullptr != pstrAggFunc);
diff --git a/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp 
b/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp
index 92cf701cf7..21fec9caf6 100644
--- a/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp
+++ b/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp
@@ -259,13 +259,7 @@ CScalarProjectList::FContainsOnlyReplicationSafeAggFuncs(
                }
                CScalarAggFunc *popScAggFunc =
                        CScalarAggFunc::PopConvert(pexprAggFunc->Pop());
-               OID safe_oid = CMDIdGPDB::CastMdid(popScAggFunc->MDId())->Oid();
-
-               // We use an allow-list approach here. While there are other 
functions that can be
-               // safely replicated, users could create custom agg funcs that 
could lead to wrong results
-               if (!(safe_oid == GPDB_INT4_AGG_MIN || safe_oid == 
GPDB_INT4_AGG_MAX ||
-                         safe_oid == GPDB_INT4_AGG_AVG || safe_oid == 
GPDB_INT4_AGG_SUM ||
-                         safe_oid == GPDB_INT4_AGG_COUNT || safe_oid == 
GPDB_COUNT_STAR))
+               if (!popScAggFunc->FRepSafe())
                {
                        return false;
                }
diff --git a/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp 
b/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp
index 92f446d285..7eb9c58f9f 100644
--- a/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp
+++ b/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp
@@ -3190,7 +3190,7 @@ CTranslatorDXLToExpr::PexprAggFunc(const CDXLNode 
*pdxlnAggref)
                GPOS_NEW(m_mp)
                        CWStringConst(m_mp, 
(pmdagg->Mdname().GetMDName())->GetBuffer()),
                dxl_op->IsDistinct(), agg_func_stage, fSplit, 
resolved_return_type_mdid,
-               agg_func_kind, dxl_op->GetArgTypes());
+               agg_func_kind, dxl_op->GetArgTypes(), pmdagg->IsAggRepSafe());
 
        CExpression *pexprAggFunc = nullptr;
 
diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp 
b/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp
index 8df950c7d1..f641e7478a 100644
--- a/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp
+++ b/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp
@@ -320,7 +320,7 @@ CXformEagerAgg::PopulateLowerProjectElement(
        agg_mdid->AddRef();
        CScalarAggFunc *lower_agg_func = CUtils::PopAggFunc(
                mp, agg_mdid, agg_name, is_distinct, EaggfuncstageLocal, true, 
nullptr,
-               EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp));
+               EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false);
        // add the arguments for the lower aggregate function, which is
        // going to be the same as the original aggregate function
        agg_arg_array->AddRef();
@@ -357,7 +357,7 @@ CXformEagerAgg::PopulateUpperProjectElement(
        agg_mdid->AddRef();
        CScalarAggFunc *upper_agg_func = CUtils::PopAggFunc(
                mp, agg_mdid, agg_name, is_distinct, EaggfuncstageGlobal, true, 
nullptr,
-               EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp));
+               EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false);
 
        // populate the argument list for the upper aggregate function
        CExpressionArray *upper_agg_arg_array = GPOS_NEW(mp) 
CExpressionArray(mp);
diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp 
b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp
index a9af5f9b10..3cf2f8ccde 100644
--- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp
+++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp
@@ -307,7 +307,8 @@ CXformSplitDQA::PexprSplitIntoLocalDQAGlobalAgg(
                                        CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                                true /* is_distinct */, EaggfuncstageLocal 
/*eaggfuncstage*/,
                                true /* fSplit */, nullptr /* 
pmdidResolvedReturnType */,
-                               EaggfunckindNormal, 
popScAggFunc->GetArgTypes());
+                               EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+                               popScAggFunc->FRepSafe());
 
                        // CScalarValuesList
                        CExpression *pexprArg = (*(*pexprAggFunc)[0])[0];
@@ -352,7 +353,8 @@ CXformSplitDQA::PexprSplitIntoLocalDQAGlobalAgg(
                                        CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                                false /* is_distinct */, EaggfuncstageGlobal 
/*eaggfuncstage*/,
                                true /* fSplit */, nullptr /* 
pmdidResolvedReturnType */,
-                               EaggfunckindNormal, 
popScAggFunc->GetArgTypes());
+                               EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+                               popScAggFunc->FRepSafe());
 
                        CExpressionArray *pdrgpexprArgsGlobal =
                                GPOS_NEW(mp) CExpressionArray(mp);
@@ -466,7 +468,8 @@ CXformSplitDQA::PexprSplitHelper(CMemoryPool *mp, 
CColumnFactory *col_factory,
                                        CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                                false /* is_distinct */, EaggfuncstageGlobal 
/*eaggfuncstage*/,
                                false /* fSplit */, nullptr /* 
pmdidResolvedReturnType */,
-                               EaggfunckindNormal, 
popScAggFunc->GetArgTypes());
+                               EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+                               popScAggFunc->FRepSafe());
 
                        CExpressionArray *pdrgpexprChildren =
                                GPOS_NEW(mp) CExpressionArray(mp);
@@ -605,7 +608,8 @@ CXformSplitDQA::PexprPrElAgg(CMemoryPool *mp, CExpression 
*pexprAggFunc,
                        CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                false, /*fdistinct */
                eaggfuncstage, true /* fSplit */, nullptr /* 
pmdidResolvedReturnType */,
-               EaggfunckindNormal, popScAggFunc->GetArgTypes());
+               EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+               popScAggFunc->FRepSafe());
 
        return CUtils::PexprScalarProjectElement(
                mp, pcrCurrStage,
diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp 
b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp
index 85bb11cffb..990df3e6b4 100644
--- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp
+++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp
@@ -208,7 +208,8 @@ CXformSplitGbAgg::PopulateLocalGlobalProjectList(
                                CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                        popScAggFunc->IsDistinct(), EaggfuncstageLocal, /* 
fGlobal */
                        true /* fSplit */, nullptr /* pmdidResolvedReturnType 
*/,
-                       EaggfunckindNormal, popScAggFunc->GetArgTypes());
+                       EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+                       popScAggFunc->FRepSafe());
 
                popScAggFunc->MDId()->AddRef();
                popScAggFunc->GetArgTypes()->AddRef();
@@ -218,7 +219,8 @@ CXformSplitGbAgg::PopulateLocalGlobalProjectList(
                                CWStringConst(mp, 
popScAggFunc->PstrAggFunc()->GetBuffer()),
                        false /* is_distinct */, EaggfuncstageGlobal, /* 
fGlobal */
                        true /* fSplit */, nullptr /* pmdidResolvedReturnType 
*/,
-                       EaggfunckindNormal, popScAggFunc->GetArgTypes());
+                       EaggfunckindNormal, popScAggFunc->GetArgTypes(),
+                       popScAggFunc->FRepSafe());
 
                // determine column reference for the new project element
                const IMDAggregate *pmdagg =
diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp 
b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp
index e869fd2c84..64bdbbfc18 100644
--- a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp
+++ b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp
@@ -3604,13 +3604,14 @@ CXformUtils::PexprWinFuncAgg2ScalarAgg(CMemoryPool *mp,
        mdid_func->AddRef();
        return GPOS_NEW(mp) CExpression(
                mp,
-               CUtils::PopAggFunc(mp, mdid_func,
-                                                  GPOS_NEW(mp) CWStringConst(
-                                                          mp, 
popScWinFunc->PstrFunc()->GetBuffer()),
-                                                  popScWinFunc->IsDistinct(), 
EaggfuncstageGlobal,
-                                                  false,        // fSplit
-                                                  nullptr,      // 
pmdidResolvedReturnType
-                                                  EaggfunckindNormal, 
GPOS_NEW(mp) ULongPtrArray(mp)),
+               CUtils::PopAggFunc(
+                       mp, mdid_func,
+                       GPOS_NEW(mp)
+                               CWStringConst(mp, 
popScWinFunc->PstrFunc()->GetBuffer()),
+                       popScWinFunc->IsDistinct(), EaggfuncstageGlobal,
+                       false,    // fSplit
+                       nullptr,  // pmdidResolvedReturnType
+                       EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), 
false),
                pdrgpexprFullWinFuncArgs);
 }
 
diff --git 
a/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h
 
b/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h
index 766c5593d5..510f859988 100644
--- 
a/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h
+++ 
b/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h
@@ -57,6 +57,9 @@ private:
        // can we use hash aggregation to compute agg function
        BOOL m_hash_agg_capable;
 
+       // is aggregate replicate slice safe for execution
+       BOOL m_is_repsafe;
+
        // process the start of an element
        void StartElement(
                const XMLCh *const element_uri,                 // URI of 
element's namespace
diff --git 
a/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h 
b/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h
index 0c44b28548..a346a2e291 100644
--- a/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h
+++ b/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h
@@ -625,6 +625,7 @@ enum Edxltoken
 
        EdxltokenGPDBAgg,
        EdxltokenGPDBIsAggOrdered,
+       EdxltokenGPDBIsAggRepSafe,
        EdxltokenGPDBAggResultTypeId,
        EdxltokenGPDBAggIntermediateResultTypeId,
        EdxltokenGPDBAggSplittable,
diff --git 
a/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h 
b/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h
index b2b6b3a595..51378c1281 100644
--- a/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h
+++ b/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h
@@ -63,6 +63,9 @@ class CMDAggregateGPDB : public IMDAggregate
        // is aggregate hash capable
        BOOL m_hash_agg_capable;
 
+       // is aggregate replication slice safe for execution
+       BOOL m_is_repsafe;
+
 public:
        CMDAggregateGPDB(const CMDAggregateGPDB &) = delete;
 
@@ -70,7 +73,8 @@ public:
        CMDAggregateGPDB(CMemoryPool *mp, IMDId *mdid, CMDName *mdname,
                                         IMDId *result_type_mdid,
                                         IMDId *intermediate_result_type_mdid, 
BOOL is_ordered_agg,
-                                        BOOL is_splittable, BOOL 
is_hash_agg_capable);
+                                        BOOL is_splittable, BOOL 
is_hash_agg_capable,
+                                        bool is_repsafe);
 
        //dtor
        ~CMDAggregateGPDB() override;
@@ -118,6 +122,13 @@ public:
                return m_hash_agg_capable;
        }
 
+       // is aggregate replicate slice execution safe
+       BOOL
+       IsAggRepSafe() const override
+       {
+               return m_is_repsafe;
+       }
+
 #ifdef GPOS_DEBUG
        // debug print of the type in the provided stream
        void DebugPrint(IOstream &os) const override;
diff --git 
a/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h 
b/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h
index 1c861b3630..6e1249c2ef 100644
--- a/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h
+++ b/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h
@@ -58,6 +58,9 @@ public:
                // is aggregate hash capable
                BOOL
                IsHashAggCapable() const = 0;
+
+       // is aggregate replicate slice execution safe
+       virtual BOOL IsAggRepSafe() const = 0;
 };
 }  // namespace gpmd
 
diff --git a/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp 
b/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp
index 1152defc73..0089dfe41a 100644
--- a/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp
+++ b/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp
@@ -33,7 +33,7 @@ CMDAggregateGPDB::CMDAggregateGPDB(CMemoryPool *mp, IMDId 
*mdid,
                                                                   CMDName 
*mdname, IMDId *result_type_mdid,
                                                                   IMDId 
*intermediate_result_type_mdid,
                                                                   BOOL 
fOrdered, BOOL is_splittable,
-                                                                  BOOL 
is_hash_agg_capable)
+                                                                  BOOL 
is_hash_agg_capable, BOOL is_repsafe)
        : m_mp(mp),
          m_mdid(mdid),
          m_mdname(mdname),
@@ -41,7 +41,8 @@ CMDAggregateGPDB::CMDAggregateGPDB(CMemoryPool *mp, IMDId 
*mdid,
          m_mdid_type_intermediate(intermediate_result_type_mdid),
          m_is_ordered(fOrdered),
          m_is_splittable(is_splittable),
-         m_hash_agg_capable(is_hash_agg_capable)
+         m_hash_agg_capable(is_hash_agg_capable),
+         m_is_repsafe(is_repsafe)
 {
        GPOS_ASSERT(mdid->IsValid());
 
@@ -149,6 +150,12 @@ CMDAggregateGPDB::Serialize(CXMLSerializer 
*xml_serializer) const
                        CDXLTokens::GetDXLTokenStr(EdxltokenGPDBIsAggOrdered),
                        m_is_ordered);
        }
+       if (m_is_repsafe)
+       {
+               xml_serializer->AddAttribute(
+                       CDXLTokens::GetDXLTokenStr(EdxltokenGPDBIsAggRepSafe),
+                       m_is_repsafe);
+       }
 
        xml_serializer->AddAttribute(
                CDXLTokens::GetDXLTokenStr(EdxltokenGPDBAggSplittable),
diff --git 
a/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp 
b/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp
index 783b036559..dbaec07042 100644
--- a/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp
+++ b/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp
@@ -39,7 +39,8 @@ CParseHandlerMDGPDBAgg::CParseHandlerMDGPDBAgg(
          m_mdid_type_intermediate(nullptr),
          m_is_ordered(false),
          m_is_splittable(true),
-         m_hash_agg_capable(true)
+         m_hash_agg_capable(true),
+         m_is_repsafe(false)
 {
 }
 
@@ -88,6 +89,16 @@ CParseHandlerMDGPDBAgg::StartElement(const XMLCh *const,  // 
element_uri,
                                EdxltokenGPDBIsAggOrdered, EdxltokenGPDBAgg);
                }
 
+               // parse repsafe aggregate info
+               const XMLCh *xml_str_repsafe_agg =
+                       
attrs.getValue(CDXLTokens::XmlstrToken(EdxltokenGPDBIsAggRepSafe));
+               if (nullptr != xml_str_repsafe_agg)
+               {
+                       m_is_repsafe = 
CDXLOperatorFactory::ConvertAttrValueToBool(
+                               m_parse_handler_mgr->GetDXLMemoryManager(), 
xml_str_repsafe_agg,
+                               EdxltokenGPDBIsAggRepSafe, EdxltokenGPDBAgg);
+               }
+
                // parse splittable aggregate info
                const XMLCh *xml_str_splittable_agg =
                        
attrs.getValue(CDXLTokens::XmlstrToken(EdxltokenGPDBAggSplittable));
@@ -166,7 +177,7 @@ CParseHandlerMDGPDBAgg::EndElement(const XMLCh *const,      
// element_uri,
                m_imd_obj = GPOS_NEW(m_mp)
                        CMDAggregateGPDB(m_mp, m_mdid, m_mdname, 
m_mdid_type_result,
                                                         
m_mdid_type_intermediate, m_is_ordered,
-                                                        m_is_splittable, 
m_hash_agg_capable);
+                                                        m_is_splittable, 
m_hash_agg_capable, m_is_repsafe);
 
                // deactivate handler
                m_parse_handler_mgr->DeactivateHandler();
diff --git a/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp 
b/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp
index 0e8b12e332..94d7c064d2 100644
--- a/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp
+++ b/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp
@@ -674,6 +674,7 @@ CDXLTokens::Init(CMemoryPool *mp)
 
                {EdxltokenGPDBAgg, GPOS_WSZ_LIT("GPDBAgg")},
                {EdxltokenGPDBIsAggOrdered, GPOS_WSZ_LIT("IsOrdered")},
+               {EdxltokenGPDBIsAggRepSafe, GPOS_WSZ_LIT("IsRepSafe")},
                {EdxltokenGPDBAggResultTypeId, GPOS_WSZ_LIT("ResultType")},
                {EdxltokenGPDBAggIntermediateResultTypeId,
                 GPOS_WSZ_LIT("IntermediateResultType")},
diff --git a/src/backend/utils/cache/lsyscache.c 
b/src/backend/utils/cache/lsyscache.c
index 40fe9c3bc7..66569cc0c8 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -2009,6 +2009,31 @@ is_agg_ordered(Oid aggid)
        return AGGKIND_IS_ORDERED_SET(aggkind);
 }
 
+/*
+ * is_repsafe_agg
+ *             Given aggregate id, check if it is an safe replicate slice 
aggregate
+ */
+bool
+is_agg_repsafe(Oid aggid)
+{
+       HeapTuple       aggTuple;
+       bool            aggrepsafe;
+       bool            isnull = false;
+
+       aggTuple = SearchSysCache1(AGGFNOID,
+                                                          
ObjectIdGetDatum(aggid));
+       if (!HeapTupleIsValid(aggTuple))
+               elog(ERROR, "cache lookup failed for aggregate %u", aggid);
+
+       aggrepsafe = DatumGetBool(SysCacheGetAttr(AGGFNOID, aggTuple,
+                                                                               
   Anum_pg_aggregate_aggrepsafeexec, &isnull));
+       Assert(!isnull);
+
+       ReleaseSysCache(aggTuple);
+
+       return aggrepsafe;
+}
+
 /*
  * is_agg_partial_capable
  *             Given aggregate id, check if it can be used in 2-phase 
aggregation.
diff --git a/src/include/gpopt/gpdbwrappers.h b/src/include/gpopt/gpdbwrappers.h
index 36c3a39dd3..cfa7344aa2 100644
--- a/src/include/gpopt/gpdbwrappers.h
+++ b/src/include/gpopt/gpdbwrappers.h
@@ -189,6 +189,8 @@ Query *FlattenJoinAliasVar(Query *query, gpos::ULONG 
query_level);
 // is aggregate ordered
 bool IsOrderedAgg(Oid aggid);
 
+bool IsRepSafeAgg(Oid aggid);
+
 // does aggregate have a combine function (and serial/deserial functions, if 
needed)
 bool IsAggPartialCapable(Oid aggid);
 
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 6aec016f77..ca5db8fabb 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -163,6 +163,7 @@ extern char func_data_access(Oid funcid);
 extern char func_exec_location(Oid funcid);
 extern Oid get_agg_transtype(Oid aggid);
 extern bool is_agg_ordered(Oid aggid);
+extern bool is_agg_repsafe(Oid aggid);
 extern bool is_agg_partial_capable(Oid aggid);
 extern RegProcedure get_func_support(Oid funcid);
 extern Oid     get_relname_relid(const char *relname, Oid relnamespace);
diff --git a/src/test/regress/expected/gp_aggregates_costs.out 
b/src/test/regress/expected/gp_aggregates_costs.out
index 69b1b15eb9..112c0bc857 100644
--- a/src/test/regress/expected/gp_aggregates_costs.out
+++ b/src/test/regress/expected/gp_aggregates_costs.out
@@ -130,3 +130,44 @@ select count(*) from test_operator_mem;
 (5 rows)
 
 abort;
+
+-- Test user-defined aggregate marked safe to execute on replicated slices 
without motion
+CREATE AGGREGATE my_unsafe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}'
+);
+CREATE AGGREGATE my_safe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}',
+    repsafe = true
+);
+CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED;
+CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED;
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable;
+                                        QUERY PLAN                             
            
+-------------------------------------------------------------------------------------------
+ Insert on a_reptable  (cost=1544.50..1544.55 rows=1 width=4)
+   ->  Broadcast Motion 1:3  (slice1; segments: 1)  (cost=1544.50..1544.55 
rows=1 width=4)
+         ->  Subquery Scan on "*SELECT*"  (cost=1544.50..1544.53 rows=1 
width=4)
+               ->  Aggregate  (cost=1544.50..1544.51 rows=1 width=8)
+                     ->  Seq Scan on b_reptable  (cost=0.00..1063.00 
rows=96300 width=4)
+ Optimizer: Postgres query optimizer
+(6 rows)
+
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable;
+                                        QUERY PLAN                             
            
+-------------------------------------------------------------------------------------------
+ Insert on a_reptable  (cost=1544.50..1544.55 rows=1 width=4)
+   ->  Broadcast Motion 1:3  (slice1; segments: 1)  (cost=1544.50..1544.55 
rows=1 width=4)
+         ->  Subquery Scan on "*SELECT*"  (cost=1544.50..1544.53 rows=1 
width=4)
+               ->  Aggregate  (cost=1544.50..1544.51 rows=1 width=8)
+                     ->  Seq Scan on b_reptable  (cost=0.00..1063.00 
rows=96300 width=4)
+ Optimizer: Postgres query optimizer
+(6 rows)
+
diff --git a/src/test/regress/expected/gp_aggregates_costs_optimizer.out 
b/src/test/regress/expected/gp_aggregates_costs_optimizer.out
index 96f319484f..5d3dd96a45 100644
--- a/src/test/regress/expected/gp_aggregates_costs_optimizer.out
+++ b/src/test/regress/expected/gp_aggregates_costs_optimizer.out
@@ -130,3 +130,42 @@ select count(*) from test_operator_mem;
 (5 rows)
 
 abort;
+
+-- Test user-defined aggregate marked safe to execute on replicated slices 
without motion
+CREATE AGGREGATE my_unsafe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}'
+);
+CREATE AGGREGATE my_safe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}',
+    repsafe = true
+);
+CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED;
+CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED;
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable;
+                                         QUERY PLAN                            
              
+---------------------------------------------------------------------------------------------
+ Insert on a_reptable  (cost=0.00..431.03 rows=1 width=4)
+   ->  Result  (cost=0.00..431.00 rows=3 width=8)
+         ->  Broadcast Motion 1:3  (slice1; segments: 1)  (cost=0.00..431.00 
rows=3 width=4)
+               ->  Aggregate  (cost=0.00..431.00 rows=3 width=8)
+                     ->  Seq Scan on b_reptable  (cost=0.00..431.00 rows=3 
width=4)
+ Optimizer: Pivotal Optimizer (GPORCA)
+(6 rows)
+
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable;
+                               QUERY PLAN                               
+------------------------------------------------------------------------
+ Insert on a_reptable  (cost=0.00..431.03 rows=1 width=4)
+   ->  Aggregate  (cost=0.00..431.00 rows=3 width=8)
+         ->  Seq Scan on b_reptable  (cost=0.00..431.00 rows=3 width=4)
+ Optimizer: Pivotal Optimizer (GPORCA)
+(4 rows)
+
diff --git a/src/test/regress/sql/gp_aggregates_costs.sql 
b/src/test/regress/sql/gp_aggregates_costs.sql
index c1337de608..839fb0bb1e 100644
--- a/src/test/regress/sql/gp_aggregates_costs.sql
+++ b/src/test/regress/sql/gp_aggregates_costs.sql
@@ -59,3 +59,24 @@ explain(costs off)
 select count(*) from test_operator_mem;
 
 abort;
+
+-- Test user-defined aggregate marked safe to execute on replicated slices 
without motion
+CREATE AGGREGATE my_unsafe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}'
+);
+CREATE AGGREGATE my_safe_avg (float8)
+(
+    sfunc = float8_accum,
+    stype = float8[],
+    finalfunc = float8_avg,
+    initcond = '{0,0,0}',
+    repsafe = true
+);
+CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED;
+CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED;
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable;
+EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to