This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 07de611941d3778dfcf37aed9a9f6ce3af72d2c8 Author: David Kimura <[email protected]> AuthorDate: Wed Jan 18 00:27:04 2023 +0000 [ORCA] Use catalog to determine replication safe functions Previously ORCA hardcoded a list of replication safe functions. That info is now stored in pg_aggregate catalog table, so use that. One of the advantages of this approach is that new replication safe functions (even user defined functions) can be added and optimized without requiring code changes in ORCA. --- src/backend/gpopt/gpdbwrappers.cpp | 12 +++++++ .../gpopt/translate/CTranslatorRelcacheToDXL.cpp | 3 +- .../data/dxl/minidump/ReplicatedTableGroupBy.mdp | 2 +- .../minidump/ReplicatedTableWithAggNoMotion.mdp | 2 +- .../gporca/libgpopt/include/gpopt/base/CUtils.h | 2 +- .../include/gpopt/operators/CScalarAggFunc.h | 13 ++++++- src/backend/gporca/libgpopt/src/base/CUtils.cpp | 18 +++++----- .../src/operators/COrderedAggPreprocessor.cpp | 2 +- .../libgpopt/src/operators/CScalarAggFunc.cpp | 5 +-- .../libgpopt/src/operators/CScalarProjectList.cpp | 8 +---- .../src/translate/CTranslatorDXLToExpr.cpp | 2 +- .../gporca/libgpopt/src/xforms/CXformEagerAgg.cpp | 4 +-- .../gporca/libgpopt/src/xforms/CXformSplitDQA.cpp | 12 ++++--- .../libgpopt/src/xforms/CXformSplitGbAgg.cpp | 6 ++-- .../gporca/libgpopt/src/xforms/CXformUtils.cpp | 15 ++++---- .../naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h | 3 ++ .../include/naucrates/dxl/xml/dxltokens.h | 1 + .../include/naucrates/md/CMDAggregateGPDB.h | 13 ++++++- .../include/naucrates/md/IMDAggregate.h | 3 ++ .../libnaucrates/src/md/CMDAggregateGPDB.cpp | 11 ++++-- .../src/parser/CParseHandlerMDGPDBAgg.cpp | 15 ++++++-- .../gporca/libnaucrates/src/xml/dxltokens.cpp | 1 + src/backend/utils/cache/lsyscache.c | 25 +++++++++++++ src/include/gpopt/gpdbwrappers.h | 2 ++ src/include/utils/lsyscache.h | 1 + src/test/regress/expected/gp_aggregates_costs.out | 41 ++++++++++++++++++++++ .../expected/gp_aggregates_costs_optimizer.out | 39 ++++++++++++++++++++ src/test/regress/sql/gp_aggregates_costs.sql | 21 +++++++++++ 28 files changed, 237 insertions(+), 45 deletions(-) diff --git a/src/backend/gpopt/gpdbwrappers.cpp b/src/backend/gpopt/gpdbwrappers.cpp index 61c5f73b33..674becd1dc 100644 --- a/src/backend/gpopt/gpdbwrappers.cpp +++ b/src/backend/gpopt/gpdbwrappers.cpp @@ -689,6 +689,18 @@ gpdb::IsOrderedAgg(Oid aggid) return false; } +bool +gpdb::IsRepSafeAgg(Oid aggid) +{ + GP_WRAP_START; + { + /* catalog tables: pg_aggregate */ + return is_agg_repsafe(aggid); + } + GP_WRAP_END; + return false; +} + bool gpdb::IsAggPartialCapable(Oid aggid) { diff --git a/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp b/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp index 4eecea5cea..9c47c2d78b 100644 --- a/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp +++ b/src/backend/gpopt/translate/CTranslatorRelcacheToDXL.cpp @@ -1521,6 +1521,7 @@ CTranslatorRelcacheToDXL::RetrieveAgg(CMemoryPool *mp, IMDId *mdid) mdid->AddRef(); BOOL is_ordered = gpdb::IsOrderedAgg(agg_oid); + BOOL is_repsafe = gpdb::IsRepSafeAgg(agg_oid); // GPDB does not support splitting of ordered aggs and aggs without a // combine function @@ -1533,7 +1534,7 @@ CTranslatorRelcacheToDXL::RetrieveAgg(CMemoryPool *mp, IMDId *mdid) CMDAggregateGPDB *pmdagg = GPOS_NEW(mp) CMDAggregateGPDB( mp, mdid, mdname, result_type_mdid, intermediate_result_type_mdid, - is_ordered, is_splittable, is_hash_agg_capable); + is_ordered, is_splittable, is_hash_agg_capable, is_repsafe); return pmdagg; } diff --git a/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp b/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp index 85db8ace76..291645ce63 100644 --- a/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp +++ b/src/backend/gporca/data/dxl/minidump/ReplicatedTableGroupBy.mdp @@ -956,7 +956,7 @@ <dxl:UpperBound Closed="true" TypeMdid="0.23.1.0" Value="100"/> </dxl:StatsBucket> </dxl:ColumnStatistics> - <dxl:GPDBAgg Mdid="0.2147.1.0" Name="count" IsSplittable="true" HashAggCapable="true"> + <dxl:GPDBAgg Mdid="0.2147.1.0" Name="count" IsRepSafe="true" IsSplittable="true" HashAggCapable="true"> <dxl:ResultType Mdid="0.20.1.0"/> <dxl:IntermediateResultType Mdid="0.20.1.0"/> </dxl:GPDBAgg> diff --git a/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp b/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp index 6f5eac6f18..ffb2e3de9f 100644 --- a/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp +++ b/src/backend/gporca/data/dxl/minidump/ReplicatedTableWithAggNoMotion.mdp @@ -214,7 +214,7 @@ <dxl:CountAgg Mdid="0.2147.1.0"/> </dxl:Type> <dxl:ColumnStatistics Mdid="1.222467.1.0.0" Name="c" Width="4.000000" NullFreq="0.000000" NdvRemain="0.000000" FreqRemain="0.000000" ColStatsMissing="true"/> - <dxl:GPDBAgg Mdid="0.2108.1.0" Name="sum" IsSplittable="true" HashAggCapable="true"> + <dxl:GPDBAgg Mdid="0.2108.1.0" Name="sum" IsRepSafe="true" IsSplittable="true" HashAggCapable="true"> <dxl:ResultType Mdid="0.20.1.0"/> <dxl:IntermediateResultType Mdid="0.20.1.0"/> </dxl:GPDBAgg> diff --git a/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h b/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h index d29b059b71..2028e720d4 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h +++ b/src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h @@ -244,7 +244,7 @@ public: BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit, IMDId * pmdidResolvedReturnType, // return type to be used if original return type is ambiguous - EAggfuncKind aggkind, ULongPtrArray *argtypes); + EAggfuncKind aggkind, ULongPtrArray *argtypes, BOOL fRepSafe); // generate an aggregate function static CExpression *PexprAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc, diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h index f06512d2a0..5147b72614 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CScalarAggFunc.h @@ -86,6 +86,9 @@ private: // is result of splitting aggregates BOOL m_fSplit; + // is aggregate replicate slice execution safe + BOOL m_fRepSafe; + public: CScalarAggFunc(const CScalarAggFunc &) = delete; @@ -93,7 +96,8 @@ public: CScalarAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc, IMDId *resolved_rettype, const CWStringConst *pstrAggFunc, BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit, - EAggfuncKind aggkind, ULongPtrArray *argtypes); + EAggfuncKind aggkind, ULongPtrArray *argtypes, + BOOL fRepSafe); // dtor ~CScalarAggFunc() override @@ -207,6 +211,13 @@ public: return m_fSplit; } + // is aggregate replicate slice execution safe + BOOL + FRepSafe() const + { + return m_fRepSafe; + } + // type of expression's result IMDId * MdidType() const override diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp b/src/backend/gporca/libgpopt/src/base/CUtils.cpp index 6b64c35307..481a81364d 100644 --- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp +++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp @@ -1821,16 +1821,16 @@ CUtils::PopAggFunc( BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit, IMDId * pmdidResolvedReturnType, // return type to be used if original return type is ambiguous - EAggfuncKind aggkind, ULongPtrArray *argtypes) + EAggfuncKind aggkind, ULongPtrArray *argtypes, BOOL fRepSafe) { GPOS_ASSERT(nullptr != pmdidAggFunc); GPOS_ASSERT(nullptr != pstrAggFunc); GPOS_ASSERT_IMP(nullptr != pmdidResolvedReturnType, pmdidResolvedReturnType->IsValid()); - return GPOS_NEW(mp) - CScalarAggFunc(mp, pmdidAggFunc, pmdidResolvedReturnType, pstrAggFunc, - is_distinct, eaggfuncstage, fSplit, aggkind, argtypes); + return GPOS_NEW(mp) CScalarAggFunc( + mp, pmdidAggFunc, pmdidResolvedReturnType, pstrAggFunc, is_distinct, + eaggfuncstage, fSplit, aggkind, argtypes, fRepSafe); } // generate an aggregate function @@ -1850,7 +1850,7 @@ CUtils::PexprAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc, // generate aggregate function CScalarAggFunc *popScAggFunc = PopAggFunc(mp, pmdidAggFunc, pstrAggFunc, is_distinct, eaggfuncstage, - fSplit, nullptr, EaggfunckindNormal, argtypes); + fSplit, nullptr, EaggfunckindNormal, argtypes, false); // generate function arguments CExpressionArray *pdrgpexpr = GPOS_NEW(mp) CExpressionArray(mp); @@ -1907,10 +1907,10 @@ CUtils::PexprCountStar(CMemoryPool *mp) CExpression(mp, GPOS_NEW(mp) CScalarValuesList(mp), GPOS_NEW(mp) CExpressionArray(mp))); - CScalarAggFunc *popScAggFunc = - PopAggFunc(mp, mdid, str, false /*is_distinct*/, - EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, - nullptr, EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp)); + CScalarAggFunc *popScAggFunc = PopAggFunc( + mp, mdid, str, false /*is_distinct*/, + EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, nullptr, + EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false); CExpression *pexprCountStar = GPOS_NEW(mp) CExpression(mp, popScAggFunc, pdrgpexpr); diff --git a/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp b/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp index 563310c1de..7109f21bae 100644 --- a/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp +++ b/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp @@ -569,7 +569,7 @@ COrderedAggPreprocessor::PexprFinalAgg(CMemoryPool *mp, CScalarAggFunc *popNewAggFunc = CUtils::PopAggFunc( mp, mdid, arg_col_ref->Name().Pstr(), false /*is_distinct*/, EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, ret_type, - EaggfunckindNormal, argtypes); + EaggfunckindNormal, argtypes, popScAggFunc->FRepSafe()); return GPOS_NEW(mp) CExpression(mp, popNewAggFunc, pdrgpexpr); } diff --git a/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp b/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp index 2865bf3d1a..8f7bca3933 100644 --- a/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CScalarAggFunc.cpp @@ -37,7 +37,7 @@ CScalarAggFunc::CScalarAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc, const CWStringConst *pstrAggFunc, BOOL is_distinct, EAggfuncStage eaggfuncstage, BOOL fSplit, EAggfuncKind aggkind, - ULongPtrArray *argtypes) + ULongPtrArray *argtypes, BOOL fRepSafe) : CScalar(mp), m_pmdidAggFunc(pmdidAggFunc), m_pmdidResolvedRetType(resolved_rettype), @@ -47,7 +47,8 @@ CScalarAggFunc::CScalarAggFunc(CMemoryPool *mp, IMDId *pmdidAggFunc, m_aggkind(aggkind), m_argtypes(argtypes), m_eaggfuncstage(eaggfuncstage), - m_fSplit(fSplit) + m_fSplit(fSplit), + m_fRepSafe(fRepSafe) { GPOS_ASSERT(nullptr != pmdidAggFunc); GPOS_ASSERT(nullptr != pstrAggFunc); diff --git a/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp b/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp index 92cf701cf7..21fec9caf6 100644 --- a/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CScalarProjectList.cpp @@ -259,13 +259,7 @@ CScalarProjectList::FContainsOnlyReplicationSafeAggFuncs( } CScalarAggFunc *popScAggFunc = CScalarAggFunc::PopConvert(pexprAggFunc->Pop()); - OID safe_oid = CMDIdGPDB::CastMdid(popScAggFunc->MDId())->Oid(); - - // We use an allow-list approach here. While there are other functions that can be - // safely replicated, users could create custom agg funcs that could lead to wrong results - if (!(safe_oid == GPDB_INT4_AGG_MIN || safe_oid == GPDB_INT4_AGG_MAX || - safe_oid == GPDB_INT4_AGG_AVG || safe_oid == GPDB_INT4_AGG_SUM || - safe_oid == GPDB_INT4_AGG_COUNT || safe_oid == GPDB_COUNT_STAR)) + if (!popScAggFunc->FRepSafe()) { return false; } diff --git a/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp b/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp index 92f446d285..7eb9c58f9f 100644 --- a/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp +++ b/src/backend/gporca/libgpopt/src/translate/CTranslatorDXLToExpr.cpp @@ -3190,7 +3190,7 @@ CTranslatorDXLToExpr::PexprAggFunc(const CDXLNode *pdxlnAggref) GPOS_NEW(m_mp) CWStringConst(m_mp, (pmdagg->Mdname().GetMDName())->GetBuffer()), dxl_op->IsDistinct(), agg_func_stage, fSplit, resolved_return_type_mdid, - agg_func_kind, dxl_op->GetArgTypes()); + agg_func_kind, dxl_op->GetArgTypes(), pmdagg->IsAggRepSafe()); CExpression *pexprAggFunc = nullptr; diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp index 8df950c7d1..f641e7478a 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformEagerAgg.cpp @@ -320,7 +320,7 @@ CXformEagerAgg::PopulateLowerProjectElement( agg_mdid->AddRef(); CScalarAggFunc *lower_agg_func = CUtils::PopAggFunc( mp, agg_mdid, agg_name, is_distinct, EaggfuncstageLocal, true, nullptr, - EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp)); + EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false); // add the arguments for the lower aggregate function, which is // going to be the same as the original aggregate function agg_arg_array->AddRef(); @@ -357,7 +357,7 @@ CXformEagerAgg::PopulateUpperProjectElement( agg_mdid->AddRef(); CScalarAggFunc *upper_agg_func = CUtils::PopAggFunc( mp, agg_mdid, agg_name, is_distinct, EaggfuncstageGlobal, true, nullptr, - EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp)); + EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false); // populate the argument list for the upper aggregate function CExpressionArray *upper_agg_arg_array = GPOS_NEW(mp) CExpressionArray(mp); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp index a9af5f9b10..3cf2f8ccde 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp @@ -307,7 +307,8 @@ CXformSplitDQA::PexprSplitIntoLocalDQAGlobalAgg( CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), true /* is_distinct */, EaggfuncstageLocal /*eaggfuncstage*/, true /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); // CScalarValuesList CExpression *pexprArg = (*(*pexprAggFunc)[0])[0]; @@ -352,7 +353,8 @@ CXformSplitDQA::PexprSplitIntoLocalDQAGlobalAgg( CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), false /* is_distinct */, EaggfuncstageGlobal /*eaggfuncstage*/, true /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); CExpressionArray *pdrgpexprArgsGlobal = GPOS_NEW(mp) CExpressionArray(mp); @@ -466,7 +468,8 @@ CXformSplitDQA::PexprSplitHelper(CMemoryPool *mp, CColumnFactory *col_factory, CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), false /* is_distinct */, EaggfuncstageGlobal /*eaggfuncstage*/, false /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); CExpressionArray *pdrgpexprChildren = GPOS_NEW(mp) CExpressionArray(mp); @@ -605,7 +608,8 @@ CXformSplitDQA::PexprPrElAgg(CMemoryPool *mp, CExpression *pexprAggFunc, CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), false, /*fdistinct */ eaggfuncstage, true /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); return CUtils::PexprScalarProjectElement( mp, pcrCurrStage, diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp index 85bb11cffb..990df3e6b4 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp @@ -208,7 +208,8 @@ CXformSplitGbAgg::PopulateLocalGlobalProjectList( CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), popScAggFunc->IsDistinct(), EaggfuncstageLocal, /* fGlobal */ true /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); popScAggFunc->MDId()->AddRef(); popScAggFunc->GetArgTypes()->AddRef(); @@ -218,7 +219,8 @@ CXformSplitGbAgg::PopulateLocalGlobalProjectList( CWStringConst(mp, popScAggFunc->PstrAggFunc()->GetBuffer()), false /* is_distinct */, EaggfuncstageGlobal, /* fGlobal */ true /* fSplit */, nullptr /* pmdidResolvedReturnType */, - EaggfunckindNormal, popScAggFunc->GetArgTypes()); + EaggfunckindNormal, popScAggFunc->GetArgTypes(), + popScAggFunc->FRepSafe()); // determine column reference for the new project element const IMDAggregate *pmdagg = diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp index e869fd2c84..64bdbbfc18 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp @@ -3604,13 +3604,14 @@ CXformUtils::PexprWinFuncAgg2ScalarAgg(CMemoryPool *mp, mdid_func->AddRef(); return GPOS_NEW(mp) CExpression( mp, - CUtils::PopAggFunc(mp, mdid_func, - GPOS_NEW(mp) CWStringConst( - mp, popScWinFunc->PstrFunc()->GetBuffer()), - popScWinFunc->IsDistinct(), EaggfuncstageGlobal, - false, // fSplit - nullptr, // pmdidResolvedReturnType - EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp)), + CUtils::PopAggFunc( + mp, mdid_func, + GPOS_NEW(mp) + CWStringConst(mp, popScWinFunc->PstrFunc()->GetBuffer()), + popScWinFunc->IsDistinct(), EaggfuncstageGlobal, + false, // fSplit + nullptr, // pmdidResolvedReturnType + EaggfunckindNormal, GPOS_NEW(mp) ULongPtrArray(mp), false), pdrgpexprFullWinFuncArgs); } diff --git a/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h b/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h index 766c5593d5..510f859988 100644 --- a/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h +++ b/src/backend/gporca/libnaucrates/include/naucrates/dxl/parser/CParseHandlerMDGPDBAgg.h @@ -57,6 +57,9 @@ private: // can we use hash aggregation to compute agg function BOOL m_hash_agg_capable; + // is aggregate replicate slice safe for execution + BOOL m_is_repsafe; + // process the start of an element void StartElement( const XMLCh *const element_uri, // URI of element's namespace diff --git a/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h b/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h index 0c44b28548..a346a2e291 100644 --- a/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h +++ b/src/backend/gporca/libnaucrates/include/naucrates/dxl/xml/dxltokens.h @@ -625,6 +625,7 @@ enum Edxltoken EdxltokenGPDBAgg, EdxltokenGPDBIsAggOrdered, + EdxltokenGPDBIsAggRepSafe, EdxltokenGPDBAggResultTypeId, EdxltokenGPDBAggIntermediateResultTypeId, EdxltokenGPDBAggSplittable, diff --git a/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h b/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h index b2b6b3a595..51378c1281 100644 --- a/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h +++ b/src/backend/gporca/libnaucrates/include/naucrates/md/CMDAggregateGPDB.h @@ -63,6 +63,9 @@ class CMDAggregateGPDB : public IMDAggregate // is aggregate hash capable BOOL m_hash_agg_capable; + // is aggregate replication slice safe for execution + BOOL m_is_repsafe; + public: CMDAggregateGPDB(const CMDAggregateGPDB &) = delete; @@ -70,7 +73,8 @@ public: CMDAggregateGPDB(CMemoryPool *mp, IMDId *mdid, CMDName *mdname, IMDId *result_type_mdid, IMDId *intermediate_result_type_mdid, BOOL is_ordered_agg, - BOOL is_splittable, BOOL is_hash_agg_capable); + BOOL is_splittable, BOOL is_hash_agg_capable, + bool is_repsafe); //dtor ~CMDAggregateGPDB() override; @@ -118,6 +122,13 @@ public: return m_hash_agg_capable; } + // is aggregate replicate slice execution safe + BOOL + IsAggRepSafe() const override + { + return m_is_repsafe; + } + #ifdef GPOS_DEBUG // debug print of the type in the provided stream void DebugPrint(IOstream &os) const override; diff --git a/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h b/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h index 1c861b3630..6e1249c2ef 100644 --- a/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h +++ b/src/backend/gporca/libnaucrates/include/naucrates/md/IMDAggregate.h @@ -58,6 +58,9 @@ public: // is aggregate hash capable BOOL IsHashAggCapable() const = 0; + + // is aggregate replicate slice execution safe + virtual BOOL IsAggRepSafe() const = 0; }; } // namespace gpmd diff --git a/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp b/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp index 1152defc73..0089dfe41a 100644 --- a/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp +++ b/src/backend/gporca/libnaucrates/src/md/CMDAggregateGPDB.cpp @@ -33,7 +33,7 @@ CMDAggregateGPDB::CMDAggregateGPDB(CMemoryPool *mp, IMDId *mdid, CMDName *mdname, IMDId *result_type_mdid, IMDId *intermediate_result_type_mdid, BOOL fOrdered, BOOL is_splittable, - BOOL is_hash_agg_capable) + BOOL is_hash_agg_capable, BOOL is_repsafe) : m_mp(mp), m_mdid(mdid), m_mdname(mdname), @@ -41,7 +41,8 @@ CMDAggregateGPDB::CMDAggregateGPDB(CMemoryPool *mp, IMDId *mdid, m_mdid_type_intermediate(intermediate_result_type_mdid), m_is_ordered(fOrdered), m_is_splittable(is_splittable), - m_hash_agg_capable(is_hash_agg_capable) + m_hash_agg_capable(is_hash_agg_capable), + m_is_repsafe(is_repsafe) { GPOS_ASSERT(mdid->IsValid()); @@ -149,6 +150,12 @@ CMDAggregateGPDB::Serialize(CXMLSerializer *xml_serializer) const CDXLTokens::GetDXLTokenStr(EdxltokenGPDBIsAggOrdered), m_is_ordered); } + if (m_is_repsafe) + { + xml_serializer->AddAttribute( + CDXLTokens::GetDXLTokenStr(EdxltokenGPDBIsAggRepSafe), + m_is_repsafe); + } xml_serializer->AddAttribute( CDXLTokens::GetDXLTokenStr(EdxltokenGPDBAggSplittable), diff --git a/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp b/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp index 783b036559..dbaec07042 100644 --- a/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp +++ b/src/backend/gporca/libnaucrates/src/parser/CParseHandlerMDGPDBAgg.cpp @@ -39,7 +39,8 @@ CParseHandlerMDGPDBAgg::CParseHandlerMDGPDBAgg( m_mdid_type_intermediate(nullptr), m_is_ordered(false), m_is_splittable(true), - m_hash_agg_capable(true) + m_hash_agg_capable(true), + m_is_repsafe(false) { } @@ -88,6 +89,16 @@ CParseHandlerMDGPDBAgg::StartElement(const XMLCh *const, // element_uri, EdxltokenGPDBIsAggOrdered, EdxltokenGPDBAgg); } + // parse repsafe aggregate info + const XMLCh *xml_str_repsafe_agg = + attrs.getValue(CDXLTokens::XmlstrToken(EdxltokenGPDBIsAggRepSafe)); + if (nullptr != xml_str_repsafe_agg) + { + m_is_repsafe = CDXLOperatorFactory::ConvertAttrValueToBool( + m_parse_handler_mgr->GetDXLMemoryManager(), xml_str_repsafe_agg, + EdxltokenGPDBIsAggRepSafe, EdxltokenGPDBAgg); + } + // parse splittable aggregate info const XMLCh *xml_str_splittable_agg = attrs.getValue(CDXLTokens::XmlstrToken(EdxltokenGPDBAggSplittable)); @@ -166,7 +177,7 @@ CParseHandlerMDGPDBAgg::EndElement(const XMLCh *const, // element_uri, m_imd_obj = GPOS_NEW(m_mp) CMDAggregateGPDB(m_mp, m_mdid, m_mdname, m_mdid_type_result, m_mdid_type_intermediate, m_is_ordered, - m_is_splittable, m_hash_agg_capable); + m_is_splittable, m_hash_agg_capable, m_is_repsafe); // deactivate handler m_parse_handler_mgr->DeactivateHandler(); diff --git a/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp b/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp index 0e8b12e332..94d7c064d2 100644 --- a/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp +++ b/src/backend/gporca/libnaucrates/src/xml/dxltokens.cpp @@ -674,6 +674,7 @@ CDXLTokens::Init(CMemoryPool *mp) {EdxltokenGPDBAgg, GPOS_WSZ_LIT("GPDBAgg")}, {EdxltokenGPDBIsAggOrdered, GPOS_WSZ_LIT("IsOrdered")}, + {EdxltokenGPDBIsAggRepSafe, GPOS_WSZ_LIT("IsRepSafe")}, {EdxltokenGPDBAggResultTypeId, GPOS_WSZ_LIT("ResultType")}, {EdxltokenGPDBAggIntermediateResultTypeId, GPOS_WSZ_LIT("IntermediateResultType")}, diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 40fe9c3bc7..66569cc0c8 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -2009,6 +2009,31 @@ is_agg_ordered(Oid aggid) return AGGKIND_IS_ORDERED_SET(aggkind); } +/* + * is_repsafe_agg + * Given aggregate id, check if it is an safe replicate slice aggregate + */ +bool +is_agg_repsafe(Oid aggid) +{ + HeapTuple aggTuple; + bool aggrepsafe; + bool isnull = false; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", aggid); + + aggrepsafe = DatumGetBool(SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_aggrepsafeexec, &isnull)); + Assert(!isnull); + + ReleaseSysCache(aggTuple); + + return aggrepsafe; +} + /* * is_agg_partial_capable * Given aggregate id, check if it can be used in 2-phase aggregation. diff --git a/src/include/gpopt/gpdbwrappers.h b/src/include/gpopt/gpdbwrappers.h index 36c3a39dd3..cfa7344aa2 100644 --- a/src/include/gpopt/gpdbwrappers.h +++ b/src/include/gpopt/gpdbwrappers.h @@ -189,6 +189,8 @@ Query *FlattenJoinAliasVar(Query *query, gpos::ULONG query_level); // is aggregate ordered bool IsOrderedAgg(Oid aggid); +bool IsRepSafeAgg(Oid aggid); + // does aggregate have a combine function (and serial/deserial functions, if needed) bool IsAggPartialCapable(Oid aggid); diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 6aec016f77..ca5db8fabb 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -163,6 +163,7 @@ extern char func_data_access(Oid funcid); extern char func_exec_location(Oid funcid); extern Oid get_agg_transtype(Oid aggid); extern bool is_agg_ordered(Oid aggid); +extern bool is_agg_repsafe(Oid aggid); extern bool is_agg_partial_capable(Oid aggid); extern RegProcedure get_func_support(Oid funcid); extern Oid get_relname_relid(const char *relname, Oid relnamespace); diff --git a/src/test/regress/expected/gp_aggregates_costs.out b/src/test/regress/expected/gp_aggregates_costs.out index 69b1b15eb9..112c0bc857 100644 --- a/src/test/regress/expected/gp_aggregates_costs.out +++ b/src/test/regress/expected/gp_aggregates_costs.out @@ -130,3 +130,44 @@ select count(*) from test_operator_mem; (5 rows) abort; + +-- Test user-defined aggregate marked safe to execute on replicated slices without motion +CREATE AGGREGATE my_unsafe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}' +); +CREATE AGGREGATE my_safe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}', + repsafe = true +); +CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED; +CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED; +EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable; + QUERY PLAN +------------------------------------------------------------------------------------------- + Insert on a_reptable (cost=1544.50..1544.55 rows=1 width=4) + -> Broadcast Motion 1:3 (slice1; segments: 1) (cost=1544.50..1544.55 rows=1 width=4) + -> Subquery Scan on "*SELECT*" (cost=1544.50..1544.53 rows=1 width=4) + -> Aggregate (cost=1544.50..1544.51 rows=1 width=8) + -> Seq Scan on b_reptable (cost=0.00..1063.00 rows=96300 width=4) + Optimizer: Postgres query optimizer +(6 rows) + +EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable; + QUERY PLAN +------------------------------------------------------------------------------------------- + Insert on a_reptable (cost=1544.50..1544.55 rows=1 width=4) + -> Broadcast Motion 1:3 (slice1; segments: 1) (cost=1544.50..1544.55 rows=1 width=4) + -> Subquery Scan on "*SELECT*" (cost=1544.50..1544.53 rows=1 width=4) + -> Aggregate (cost=1544.50..1544.51 rows=1 width=8) + -> Seq Scan on b_reptable (cost=0.00..1063.00 rows=96300 width=4) + Optimizer: Postgres query optimizer +(6 rows) + diff --git a/src/test/regress/expected/gp_aggregates_costs_optimizer.out b/src/test/regress/expected/gp_aggregates_costs_optimizer.out index 96f319484f..5d3dd96a45 100644 --- a/src/test/regress/expected/gp_aggregates_costs_optimizer.out +++ b/src/test/regress/expected/gp_aggregates_costs_optimizer.out @@ -130,3 +130,42 @@ select count(*) from test_operator_mem; (5 rows) abort; + +-- Test user-defined aggregate marked safe to execute on replicated slices without motion +CREATE AGGREGATE my_unsafe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}' +); +CREATE AGGREGATE my_safe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}', + repsafe = true +); +CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED; +CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED; +EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable; + QUERY PLAN +--------------------------------------------------------------------------------------------- + Insert on a_reptable (cost=0.00..431.03 rows=1 width=4) + -> Result (cost=0.00..431.00 rows=3 width=8) + -> Broadcast Motion 1:3 (slice1; segments: 1) (cost=0.00..431.00 rows=3 width=4) + -> Aggregate (cost=0.00..431.00 rows=3 width=8) + -> Seq Scan on b_reptable (cost=0.00..431.00 rows=3 width=4) + Optimizer: Pivotal Optimizer (GPORCA) +(6 rows) + +EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable; + QUERY PLAN +------------------------------------------------------------------------ + Insert on a_reptable (cost=0.00..431.03 rows=1 width=4) + -> Aggregate (cost=0.00..431.00 rows=3 width=8) + -> Seq Scan on b_reptable (cost=0.00..431.00 rows=3 width=4) + Optimizer: Pivotal Optimizer (GPORCA) +(4 rows) + diff --git a/src/test/regress/sql/gp_aggregates_costs.sql b/src/test/regress/sql/gp_aggregates_costs.sql index c1337de608..839fb0bb1e 100644 --- a/src/test/regress/sql/gp_aggregates_costs.sql +++ b/src/test/regress/sql/gp_aggregates_costs.sql @@ -59,3 +59,24 @@ explain(costs off) select count(*) from test_operator_mem; abort; + +-- Test user-defined aggregate marked safe to execute on replicated slices without motion +CREATE AGGREGATE my_unsafe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}' +); +CREATE AGGREGATE my_safe_avg (float8) +( + sfunc = float8_accum, + stype = float8[], + finalfunc = float8_avg, + initcond = '{0,0,0}', + repsafe = true +); +CREATE TABLE a_reptable (a int) DISTRIBUTED REPLICATED; +CREATE TABLE b_reptable (b int) DISTRIBUTED REPLICATED; +EXPLAIN INSERT INTO a_reptable(a) SELECT my_unsafe_avg(b) FROM b_reptable; +EXPLAIN INSERT INTO a_reptable(a) SELECT my_safe_avg(b) FROM b_reptable; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
