This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit ff274c55a9b6aefacd41ff6e482b5b77fac75791
Author: David Kimura <[email protected]>
AuthorDate: Tue Jan 17 23:51:47 2023 +0000

    Store aggregate replication safety info in catalog
    
    In GPDB certain aggregates (specifically order agnostic functions) can
    be safely executed on replicated slices. The alternative is to execute
    the aggregate on a single segment and then broadcast to the others. The
    performance impact of broadcast can be significant on large replicated
    datasets.
    
    This commit adds an entry in pg_aggregate to distinguish between
    aggregates that can and cannot be safely executed on replicated slices.
    By default aggregates are not considered safe to execute on replicated
    slices. A user can create safe aggregates by explicitly specifying a new
    optional parameter 'repsafe' during CREATE AGGREGATE command.
---
 src/backend/catalog/pg_aggregate.c   |   2 +
 src/backend/commands/aggregatecmds.c |   4 +
 src/bin/pg_dump/pg_dump.c            |  12 ++
 src/include/catalog/catversion.h     |   2 +-
 src/include/catalog/pg_aggregate.dat | 213 ++++++++++++++++++-----------------
 src/include/catalog/pg_aggregate.h   |   4 +
 6 files changed, 132 insertions(+), 105 deletions(-)

diff --git a/src/backend/catalog/pg_aggregate.c 
b/src/backend/catalog/pg_aggregate.c
index 7e66bdbb8d..e19bc5d974 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -75,6 +75,7 @@ AggregateCreate(const char *aggName,
                                int32 aggmTransSpace,
                                const char *agginitval,
                                const char *aggminitval,
+                               bool aggrepsafeexec,
                                char proparallel)
 {
        Relation        aggdesc;
@@ -680,6 +681,7 @@ AggregateCreate(const char *aggName,
        values[Anum_pg_aggregate_aggtransspace - 1] = 
Int32GetDatum(aggTransSpace);
        values[Anum_pg_aggregate_aggmtranstype - 1] = 
ObjectIdGetDatum(aggmTransType);
        values[Anum_pg_aggregate_aggmtransspace - 1] = 
Int32GetDatum(aggmTransSpace);
+       values[Anum_pg_aggregate_aggrepsafeexec - 1] = 
BoolGetDatum(aggrepsafeexec);
        if (agginitval)
                values[Anum_pg_aggregate_agginitval - 1] = 
CStringGetTextDatum(agginitval);
        else
diff --git a/src/backend/commands/aggregatecmds.c 
b/src/backend/commands/aggregatecmds.c
index 1251965631..e64f3a6920 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -104,6 +104,7 @@ DefineAggregate(ParseState *pstate,
        char            proparallel = PROPARALLEL_UNSAFE;
        ListCell   *pl;
        List       *orig_args = args;
+       bool            repsafe = false;
 
        /* Convert list of names to a name and namespace */
        aggNamespace = QualifiedNameGetCreationNamespace(name, &aggName);
@@ -198,6 +199,8 @@ DefineAggregate(ParseState *pstate,
                        minitval = defGetString(defel);
                else if (strcmp(defel->defname, "parallel") == 0)
                        parallel = defGetString(defel);
+               else if (strcmp(defel->defname, "repsafe") == 0)
+                       repsafe = defGetBoolean(defel);
                else
                        ereport(WARNING,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -489,6 +492,7 @@ DefineAggregate(ParseState *pstate,
                                                   mtransSpace, /* transition 
space */
                                                   initval, /* initial 
condition */
                                                   minitval,    /* initial 
condition */
+                                                  repsafe, /* is replicate 
slice execution safe */
                                                   proparallel);        /* 
parallel safe? */
 
        if (Gp_role == GP_ROLE_DISPATCH)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index d737635f76..63c0d5f9f2 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -15734,6 +15734,7 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
        const char *aggmtransspace;
        const char *agginitval;
        const char *aggminitval;
+       bool            aggrepsafeexec;
        const char *proparallel;
        char            defaultfinalmodify;
 
@@ -15833,6 +15834,13 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
                                                         "'0' AS 
aggfinalmodify,\n"
                                                         "'0' AS 
aggmfinalmodify\n");
 
+       if (fout->remoteVersion >= 140000)
+               appendPQExpBufferStr(query,
+                                                               
"aggrepsafeexec,\n");
+       else
+               appendPQExpBufferStr(query,
+                                                                "false AS 
aggrepsafeexec,\n");
+
        appendPQExpBuffer(query,
                                          "FROM pg_catalog.pg_aggregate a, 
pg_catalog.pg_proc p "
                                          "WHERE a.aggfnoid = p.oid "
@@ -15864,6 +15872,7 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
        aggmtransspace = PQgetvalue(res, 0, PQfnumber(res, "aggmtransspace"));
        agginitval = PQgetvalue(res, 0, i_agginitval);
        aggminitval = PQgetvalue(res, 0, i_aggminitval);
+       aggrepsafeexec = (PQgetvalue(res, 0, PQfnumber(res, 
"aggrepsafeexec"))[0] == 't');
        proparallel = PQgetvalue(res, 0, PQfnumber(res, "proparallel"));
 
        if (fout->remoteVersion >= 80400)
@@ -15990,6 +15999,9 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
                }
        }
 
+       if (aggrepsafeexec)
+               appendPQExpBuffer(details, ",\n    REPSAFE = true");
+
        aggsortconvop = getFormattedOperatorName(aggsortop);
        if (aggsortconvop)
        {
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 799916398b..a06d8bea07 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -56,6 +56,6 @@
  */
 
 /*                                                     3yyymmddN */
-#define CATALOG_VERSION_NO     302501601
+#define CATALOG_VERSION_NO     302501131
 
 #endif
diff --git a/src/include/catalog/pg_aggregate.dat 
b/src/include/catalog/pg_aggregate.dat
index 0eece04a61..5c95dcc757 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -18,17 +18,18 @@
   aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
   aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
   aggmfinalfn => 'numeric_poly_avg', aggtranstype => 'internal',
-  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'avg(int4)', aggtransfn => 'int4_avg_accum',
   aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine',
   aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
   aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8',
-  agginitval => '{0,0}', aggminitval => '{0,0}' },
+  agginitval => '{0,0}', aggminitval => '{0,0}', aggrepsafeexec => 't' },
 { aggfnoid => 'avg(int2)', aggtransfn => 'int2_avg_accum',
   aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine',
   aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv',
   aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8',
-  agginitval => '{0,0}', aggminitval => '{0,0}' },
+  agginitval => '{0,0}', aggminitval => '{0,0}', aggrepsafeexec => 't' },
 { aggfnoid => 'avg(numeric)', aggtransfn => 'numeric_avg_accum',
   aggfinalfn => 'numeric_avg', aggcombinefn => 'numeric_avg_combine',
   aggserialfn => 'numeric_avg_serialize',
@@ -56,11 +57,12 @@
   aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
   aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
   aggmfinalfn => 'numeric_poly_sum', aggtranstype => 'internal',
-  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'sum(int4)', aggtransfn => 'int4_sum', aggcombinefn => 'int8pl',
   aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
   aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
-  aggmtranstype => '_int8', aggminitval => '{0,0}' },
+  aggmtranstype => '_int8', aggminitval => '{0,0}', aggrepsafeexec => 't' },
 { aggfnoid => 'sum(int2)', aggtransfn => 'int2_sum', aggcombinefn => 'int8pl',
   aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv',
   aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
@@ -88,64 +90,65 @@
 # max
 { aggfnoid => 'max(int8)', aggtransfn => 'int8larger',
   aggcombinefn => 'int8larger', aggsortop => '>(int8,int8)',
-  aggtranstype => 'int8' },
+  aggtranstype => 'int8', aggrepsafeexec => 't' },
 { aggfnoid => 'max(int4)', aggtransfn => 'int4larger',
   aggcombinefn => 'int4larger', aggsortop => '>(int4,int4)',
-  aggtranstype => 'int4' },
+  aggtranstype => 'int4', aggrepsafeexec => 't' },
 { aggfnoid => 'max(int2)', aggtransfn => 'int2larger',
   aggcombinefn => 'int2larger', aggsortop => '>(int2,int2)',
-  aggtranstype => 'int2' },
+  aggtranstype => 'int2', aggrepsafeexec => 't' },
 { aggfnoid => 'max(oid)', aggtransfn => 'oidlarger',
-  aggcombinefn => 'oidlarger', aggsortop => '>(oid,oid)',
-  aggtranstype => 'oid' },
+  aggcombinefn => 'oidlarger', aggsortop => '>(oid,oid)', aggtranstype => 
'oid',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'max(float4)', aggtransfn => 'float4larger',
   aggcombinefn => 'float4larger', aggsortop => '>(float4,float4)',
-  aggtranstype => 'float4' },
+  aggtranstype => 'float4', aggrepsafeexec => 't' },
 { aggfnoid => 'max(float8)', aggtransfn => 'float8larger',
   aggcombinefn => 'float8larger', aggsortop => '>(float8,float8)',
-  aggtranstype => 'float8' },
+  aggtranstype => 'float8', aggrepsafeexec => 't' },
 { aggfnoid => 'max(date)', aggtransfn => 'date_larger',
   aggcombinefn => 'date_larger', aggsortop => '>(date,date)',
-  aggtranstype => 'date' },
+  aggtranstype => 'date', aggrepsafeexec => 't' },
 { aggfnoid => 'max(time)', aggtransfn => 'time_larger',
   aggcombinefn => 'time_larger', aggsortop => '>(time,time)',
-  aggtranstype => 'time' },
+  aggtranstype => 'time', aggrepsafeexec => 't' },
 { aggfnoid => 'max(timetz)', aggtransfn => 'timetz_larger',
   aggcombinefn => 'timetz_larger', aggsortop => '>(timetz,timetz)',
-  aggtranstype => 'timetz' },
+  aggtranstype => 'timetz', aggrepsafeexec => 't' },
 { aggfnoid => 'max(money)', aggtransfn => 'cashlarger',
   aggcombinefn => 'cashlarger', aggsortop => '>(money,money)',
-  aggtranstype => 'money' },
+  aggtranstype => 'money', aggrepsafeexec => 't' },
 { aggfnoid => 'max(timestamp)', aggtransfn => 'timestamp_larger',
   aggcombinefn => 'timestamp_larger', aggsortop => '>(timestamp,timestamp)',
-  aggtranstype => 'timestamp' },
+  aggtranstype => 'timestamp', aggrepsafeexec => 't' },
 { aggfnoid => 'max(timestamptz)', aggtransfn => 'timestamptz_larger',
   aggcombinefn => 'timestamptz_larger',
-  aggsortop => '>(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
+  aggsortop => '>(timestamptz,timestamptz)', aggtranstype => 'timestamptz',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'max(interval)', aggtransfn => 'interval_larger',
   aggcombinefn => 'interval_larger', aggsortop => '>(interval,interval)',
-  aggtranstype => 'interval' },
+  aggtranstype => 'interval', aggrepsafeexec => 't' },
 { aggfnoid => 'max(text)', aggtransfn => 'text_larger',
   aggcombinefn => 'text_larger', aggsortop => '>(text,text)',
-  aggtranstype => 'text' },
+  aggtranstype => 'text', aggrepsafeexec => 't' },
 { aggfnoid => 'max(numeric)', aggtransfn => 'numeric_larger',
   aggcombinefn => 'numeric_larger', aggsortop => '>(numeric,numeric)',
-  aggtranstype => 'numeric' },
+  aggtranstype => 'numeric', aggrepsafeexec => 't' },
 { aggfnoid => 'max(anyarray)', aggtransfn => 'array_larger',
   aggcombinefn => 'array_larger', aggsortop => '>(anyarray,anyarray)',
-  aggtranstype => 'anyarray' },
+  aggtranstype => 'anyarray', aggrepsafeexec => 't' },
 { aggfnoid => 'max(bpchar)', aggtransfn => 'bpchar_larger',
   aggcombinefn => 'bpchar_larger', aggsortop => '>(bpchar,bpchar)',
-  aggtranstype => 'bpchar' },
+  aggtranstype => 'bpchar', aggrepsafeexec => 't' },
 { aggfnoid => 'max(tid)', aggtransfn => 'tidlarger',
-  aggcombinefn => 'tidlarger', aggsortop => '>(tid,tid)',
-  aggtranstype => 'tid' },
+  aggcombinefn => 'tidlarger', aggsortop => '>(tid,tid)', aggtranstype => 
'tid',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'max(anyenum)', aggtransfn => 'enum_larger',
   aggcombinefn => 'enum_larger', aggsortop => '>(anyenum,anyenum)',
-  aggtranstype => 'anyenum' },
+  aggtranstype => 'anyenum', aggrepsafeexec => 't' },
 { aggfnoid => 'max(inet)', aggtransfn => 'network_larger',
   aggcombinefn => 'network_larger', aggsortop => '>(inet,inet)',
-  aggtranstype => 'inet' },
+  aggtranstype => 'inet', aggrepsafeexec => 't' },
 { aggfnoid => 'max(pg_lsn)', aggtransfn => 'pg_lsn_larger',
   aggcombinefn => 'pg_lsn_larger', aggsortop => '>(pg_lsn,pg_lsn)',
   aggtranstype => 'pg_lsn' },
@@ -153,64 +156,65 @@
 # min
 { aggfnoid => 'min(int8)', aggtransfn => 'int8smaller',
   aggcombinefn => 'int8smaller', aggsortop => '<(int8,int8)',
-  aggtranstype => 'int8' },
+  aggtranstype => 'int8', aggrepsafeexec => 't' },
 { aggfnoid => 'min(int4)', aggtransfn => 'int4smaller',
   aggcombinefn => 'int4smaller', aggsortop => '<(int4,int4)',
-  aggtranstype => 'int4' },
+  aggtranstype => 'int4', aggrepsafeexec => 't' },
 { aggfnoid => 'min(int2)', aggtransfn => 'int2smaller',
   aggcombinefn => 'int2smaller', aggsortop => '<(int2,int2)',
-  aggtranstype => 'int2' },
+  aggtranstype => 'int2', aggrepsafeexec => 't' },
 { aggfnoid => 'min(oid)', aggtransfn => 'oidsmaller',
   aggcombinefn => 'oidsmaller', aggsortop => '<(oid,oid)',
-  aggtranstype => 'oid' },
+  aggtranstype => 'oid', aggrepsafeexec => 't' },
 { aggfnoid => 'min(float4)', aggtransfn => 'float4smaller',
   aggcombinefn => 'float4smaller', aggsortop => '<(float4,float4)',
-  aggtranstype => 'float4' },
+  aggtranstype => 'float4', aggrepsafeexec => 't' },
 { aggfnoid => 'min(float8)', aggtransfn => 'float8smaller',
   aggcombinefn => 'float8smaller', aggsortop => '<(float8,float8)',
-  aggtranstype => 'float8' },
+  aggtranstype => 'float8', aggrepsafeexec => 't' },
 { aggfnoid => 'min(date)', aggtransfn => 'date_smaller',
   aggcombinefn => 'date_smaller', aggsortop => '<(date,date)',
-  aggtranstype => 'date' },
+  aggtranstype => 'date', aggrepsafeexec => 't' },
 { aggfnoid => 'min(time)', aggtransfn => 'time_smaller',
   aggcombinefn => 'time_smaller', aggsortop => '<(time,time)',
-  aggtranstype => 'time' },
+  aggtranstype => 'time', aggrepsafeexec => 't' },
 { aggfnoid => 'min(timetz)', aggtransfn => 'timetz_smaller',
   aggcombinefn => 'timetz_smaller', aggsortop => '<(timetz,timetz)',
-  aggtranstype => 'timetz' },
+  aggtranstype => 'timetz', aggrepsafeexec => 't' },
 { aggfnoid => 'min(money)', aggtransfn => 'cashsmaller',
   aggcombinefn => 'cashsmaller', aggsortop => '<(money,money)',
-  aggtranstype => 'money' },
+  aggtranstype => 'money', aggrepsafeexec => 't' },
 { aggfnoid => 'min(timestamp)', aggtransfn => 'timestamp_smaller',
   aggcombinefn => 'timestamp_smaller', aggsortop => '<(timestamp,timestamp)',
-  aggtranstype => 'timestamp' },
+  aggtranstype => 'timestamp', aggrepsafeexec => 't' },
 { aggfnoid => 'min(timestamptz)', aggtransfn => 'timestamptz_smaller',
   aggcombinefn => 'timestamptz_smaller',
-  aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
+  aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'min(interval)', aggtransfn => 'interval_smaller',
   aggcombinefn => 'interval_smaller', aggsortop => '<(interval,interval)',
-  aggtranstype => 'interval' },
+  aggtranstype => 'interval', aggrepsafeexec => 't' },
 { aggfnoid => 'min(text)', aggtransfn => 'text_smaller',
   aggcombinefn => 'text_smaller', aggsortop => '<(text,text)',
-  aggtranstype => 'text' },
+  aggtranstype => 'text', aggrepsafeexec => 't' },
 { aggfnoid => 'min(numeric)', aggtransfn => 'numeric_smaller',
   aggcombinefn => 'numeric_smaller', aggsortop => '<(numeric,numeric)',
-  aggtranstype => 'numeric' },
+  aggtranstype => 'numeric', aggrepsafeexec => 't' },
 { aggfnoid => 'min(anyarray)', aggtransfn => 'array_smaller',
   aggcombinefn => 'array_smaller', aggsortop => '<(anyarray,anyarray)',
-  aggtranstype => 'anyarray' },
+  aggtranstype => 'anyarray', aggrepsafeexec => 't' },
 { aggfnoid => 'min(bpchar)', aggtransfn => 'bpchar_smaller',
   aggcombinefn => 'bpchar_smaller', aggsortop => '<(bpchar,bpchar)',
-  aggtranstype => 'bpchar' },
+  aggtranstype => 'bpchar', aggrepsafeexec => 't' },
 { aggfnoid => 'min(tid)', aggtransfn => 'tidsmaller',
   aggcombinefn => 'tidsmaller', aggsortop => '<(tid,tid)',
-  aggtranstype => 'tid' },
+  aggtranstype => 'tid', aggrepsafeexec => 't' },
 { aggfnoid => 'min(anyenum)', aggtransfn => 'enum_smaller',
   aggcombinefn => 'enum_smaller', aggsortop => '<(anyenum,anyenum)',
-  aggtranstype => 'anyenum' },
+  aggtranstype => 'anyenum', aggrepsafeexec => 't' },
 { aggfnoid => 'min(inet)', aggtransfn => 'network_smaller',
   aggcombinefn => 'network_smaller', aggsortop => '<(inet,inet)',
-  aggtranstype => 'inet' },
+  aggtranstype => 'inet', aggrepsafeexec => 't' },
 { aggfnoid => 'min(pg_lsn)', aggtransfn => 'pg_lsn_smaller',
   aggcombinefn => 'pg_lsn_smaller', aggsortop => '<(pg_lsn,pg_lsn)',
   aggtranstype => 'pg_lsn' },
@@ -219,10 +223,12 @@
 { aggfnoid => 'count(any)', aggtransfn => 'int8inc_any',
   aggcombinefn => 'int8pl', aggmtransfn => 'int8inc_any',
   aggminvtransfn => 'int8dec_any', aggtranstype => 'int8',
-  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+  aggrepsafeexec => 't' },
 { aggfnoid => 'count()', aggtransfn => 'int8inc', aggcombinefn => 'int8pl',
   aggmtransfn => 'int8inc', aggminvtransfn => 'int8dec', aggtranstype => 
'int8',
-  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+  aggrepsafeexec => 't' },
 
 # var_pop
 { aggfnoid => 'var_pop(int8)', aggtransfn => 'int8_accum',
@@ -530,17 +536,27 @@
 { aggfnoid => 'xmlagg', aggtransfn => 'xmlconcat2', aggtranstype => 'xml' },
 
 # array
-{ aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn', 
aggcombinefn => 'array_agg_combine',
-  aggserialfn => 'array_agg_serialize', aggdeserialfn => 
'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn', aggfinalextra => 
't',
+{ aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn',
+  aggfinalfn => 'array_agg_finalfn', aggcombinefn => 'array_agg_combine',
+  aggserialfn => 'array_agg_serialize',
+  aggdeserialfn => 'array_agg_deserialize', aggfinalextra => 't',
   aggtranstype => 'internal' },
-{ aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn', 
aggcombinefn => 'array_agg_array_combine',
-  aggserialfn => 'array_agg_array_serialize', aggdeserialfn => 
'array_agg_array_deserialize', aggfinalfn => 'array_agg_array_finalfn', 
aggfinalextra => 't',
+{ aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn',
+  aggfinalfn => 'array_agg_array_finalfn',
+  aggcombinefn => 'array_agg_array_combine',
+  aggserialfn => 'array_agg_array_serialize',
+  aggdeserialfn => 'array_agg_array_deserialize', aggfinalextra => 't',
   aggtranstype => 'internal' },
-{ aggfnoid => 'gp_array_agg(anynonarray)', aggtransfn => 'array_agg_transfn', 
aggcombinefn => 'array_agg_combine',
-  aggserialfn => 'array_agg_serialize', aggdeserialfn => 
'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn', aggfinalextra => 
't',
+{ aggfnoid => 'gp_array_agg(anynonarray)', aggtransfn => 'array_agg_transfn',
+  aggfinalfn => 'array_agg_finalfn', aggcombinefn => 'array_agg_combine',
+  aggserialfn => 'array_agg_serialize',
+  aggdeserialfn => 'array_agg_deserialize', aggfinalextra => 't',
   aggtranstype => 'internal' },
-{ aggfnoid => 'gp_array_agg(anyarray)', aggtransfn => 
'array_agg_array_transfn', aggcombinefn => 'array_agg_array_combine',
-  aggserialfn => 'array_agg_array_serialize', aggdeserialfn => 
'array_agg_array_deserialize', aggfinalfn => 'array_agg_array_finalfn', 
aggfinalextra => 't',
+{ aggfnoid => 'gp_array_agg(anyarray)', aggtransfn => 
'array_agg_array_transfn',
+  aggfinalfn => 'array_agg_array_finalfn',
+  aggcombinefn => 'array_agg_array_combine',
+  aggserialfn => 'array_agg_array_serialize',
+  aggdeserialfn => 'array_agg_array_deserialize', aggfinalextra => 't',
   aggtranstype => 'internal' },
 
 # text
@@ -622,39 +638,37 @@
   aggmfinalmodify => 'w', aggtranstype => 'internal' },
 
 # MPP Aggregate -- array_sum -- special for prospective customer.
-{ aggfnoid => 'array_sum(_int4)', aggkind => 'n',
-  aggtransfn => 'array_add(_int4,_int4)', aggtranstype => '_int4', agginitval 
=> '{}' },
+{ aggfnoid => 'array_sum(_int4)', aggtransfn => 'array_add(_int4,_int4)',
+  aggtranstype => '_int4', agginitval => '{}' },
 
 # sum(array[])
-{ aggfnoid => 'sum(_int2)', aggtransfn => 'int2_matrix_accum(_int8,_int2)', 
aggkind => 'n',
-  aggcombinefn => 'int8_matrix_accum(_int8,_int8)',
-  aggtranstype => '_int8' },
-{ aggfnoid => 'sum(_int4)', aggtransfn => 'int4_matrix_accum(_int8,_int4)', 
aggkind => 'n',
-  aggcombinefn => 'int8_matrix_accum(_int8,_int8)',
-  aggtranstype => '_int8' },
-{ aggfnoid => 'sum(_int8)', aggtransfn => 'int8_matrix_accum(_int8,_int8)', 
aggkind => 'n',
-  aggcombinefn => 'int8_matrix_accum(_int8,_int8)',
-  aggtranstype => '_int8' },
-{ aggfnoid => 'sum(_float8)', aggtransfn => 
'float8_matrix_accum(_float8,_float8)', aggkind => 'n',
+{ aggfnoid => 'sum(_int2)', aggtransfn => 'int2_matrix_accum(_int8,_int2)',
+  aggcombinefn => 'int8_matrix_accum(_int8,_int8)', aggtranstype => '_int8' },
+{ aggfnoid => 'sum(_int4)', aggtransfn => 'int4_matrix_accum(_int8,_int4)',
+  aggcombinefn => 'int8_matrix_accum(_int8,_int8)', aggtranstype => '_int8' },
+{ aggfnoid => 'sum(_int8)', aggtransfn => 'int8_matrix_accum(_int8,_int8)',
+  aggcombinefn => 'int8_matrix_accum(_int8,_int8)', aggtranstype => '_int8' },
+{ aggfnoid => 'sum(_float8)',
+  aggtransfn => 'float8_matrix_accum(_float8,_float8)',
   aggcombinefn => 'float8_matrix_accum(_float8,_float8)',
   aggtranstype => '_float8' },
 
-{ aggfnoid => 'pivot_sum(_text,text,int4)', aggtransfn => 
'int4_pivot_accum(_int8,_text,text,int4)', aggkind => 'n',
-  aggcombinefn => 'int8_matrix_accum(_int8,_int8)',
-  aggtranstype => '_int8' },
+{ aggfnoid => 'pivot_sum(_text,text,int4)',
+  aggtransfn => 'int4_pivot_accum(_int8,_text,text,int4)',
+  aggcombinefn => 'int8_matrix_accum(_int8,_int8)', aggtranstype => '_int8' },
 
-{ aggfnoid => 'pivot_sum(_text,text,int8)', aggtransfn => 
'int8_pivot_accum(_int8,_text,text,int8)', aggkind => 'n',
-  aggcombinefn => 'int8_matrix_accum(_int8,_int8)',
-  aggtranstype => '_int8' },
-{ aggfnoid => 'pivot_sum(_text,text,float8)', aggtransfn => 
'float8_pivot_accum(_float8,_text,text,float8)', aggkind => 'n',
+{ aggfnoid => 'pivot_sum(_text,text,int8)',
+  aggtransfn => 'int8_pivot_accum(_int8,_text,text,int8)',
+  aggcombinefn => 'int8_matrix_accum(_int8,_int8)', aggtranstype => '_int8' },
+{ aggfnoid => 'pivot_sum(_text,text,float8)',
+  aggtransfn => 'float8_pivot_accum(_float8,_text,text,float8)',
   aggcombinefn => 'float8_matrix_accum(_float8,_float8)',
   aggtranstype => '_float8' },
 
 # GPDB: additional variants of percentile_cont, for timestamps
 { aggfnoid => 'percentile_cont(float8,timestamp)', aggkind => 'o',
   aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
-  aggfinalfn => 'percentile_cont_timestamp_final',
-  aggtranstype => 'internal' },
+  aggfinalfn => 'percentile_cont_timestamp_final', aggtranstype => 'internal' 
},
 { aggfnoid => 'percentile_cont(_float8,timestamp)', aggkind => 'o',
   aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
   aggfinalfn => 'percentile_cont_timestamp_multi_final',
@@ -669,51 +683,42 @@
   aggtranstype => 'internal' },
 
 # median
-{ aggfnoid => 'median(float8,float8)', aggkind => 'o',
-  aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
-  aggfinalfn => 'percentile_cont_float8_final',
-  aggtranstype => 'internal' },
+{ aggfnoid => 'median(float8,float8)', aggkind => 'o', aggnumdirectargs => '1',
+  aggtransfn => 'ordered_set_transition',
+  aggfinalfn => 'percentile_cont_float8_final', aggtranstype => 'internal' },
 { aggfnoid => 'median(float8,interval)', aggkind => 'o',
   aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
-  aggfinalfn => 'percentile_cont_interval_final',
-  aggtranstype => 'internal' },
+  aggfinalfn => 'percentile_cont_interval_final', aggtranstype => 'internal' },
 { aggfnoid => 'median(float8,timestamp)', aggkind => 'o',
   aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
-  aggfinalfn => 'percentile_cont_timestamp_final',
-  aggtranstype => 'internal' },
+  aggfinalfn => 'percentile_cont_timestamp_final', aggtranstype => 'internal' 
},
 { aggfnoid => 'median(float8,timestamptz)', aggkind => 'o',
   aggnumdirectargs => '1', aggtransfn => 'ordered_set_transition',
   aggfinalfn => 'percentile_cont_timestamptz_final',
   aggtranstype => 'internal' },
 
 # hyperloglog
-{ aggfnoid => 'gp_hyperloglog_accum(anyelement)', aggkind => 'n',
+{ aggfnoid => 'gp_hyperloglog_accum(anyelement)',
   aggtransfn => 'gp_hyperloglog_add_item_agg_default',
-  aggfinalfn => 'gp_hyperloglog_comp',
-  aggcombinefn => 'gp_hyperloglog_merge',
+  aggfinalfn => 'gp_hyperloglog_comp', aggcombinefn => 'gp_hyperloglog_merge',
   aggtranstype => 'gp_hyperloglog_estimator' },
 
 # GPDB: additional variants of percentile_cont and percentile_disc
-{ aggfnoid => 'gp_percentile_cont(float8,float8,int8,int8)', aggkind => 'n',
+{ aggfnoid => 'gp_percentile_cont(float8,float8,int8,int8)',
   aggtransfn => 'gp_percentile_cont_float8_transition',
-  aggfinalfn => 'gp_percentile_final',
-  aggtranstype => 'float8' },
-{ aggfnoid => 'gp_percentile_cont(interval,float8,int8,int8)', aggkind => 'n',
+  aggfinalfn => 'gp_percentile_final', aggtranstype => 'float8' },
+{ aggfnoid => 'gp_percentile_cont(interval,float8,int8,int8)',
   aggtransfn => 'gp_percentile_cont_interval_transition',
-  aggfinalfn => 'gp_percentile_final',
-  aggtranstype => 'interval' },
-{ aggfnoid => 'gp_percentile_cont(timestamp,float8,int8,int8)', aggkind => 'n',
+  aggfinalfn => 'gp_percentile_final', aggtranstype => 'interval' },
+{ aggfnoid => 'gp_percentile_cont(timestamp,float8,int8,int8)',
   aggtransfn => 'gp_percentile_cont_timestamp_transition',
-  aggfinalfn => 'gp_percentile_final',
-  aggtranstype => 'timestamp' },
-{ aggfnoid => 'gp_percentile_cont(timestamptz,float8,int8,int8)', aggkind => 
'n',
+  aggfinalfn => 'gp_percentile_final', aggtranstype => 'timestamp' },
+{ aggfnoid => 'gp_percentile_cont(timestamptz,float8,int8,int8)',
   aggtransfn => 'gp_percentile_cont_timestamptz_transition',
-  aggfinalfn => 'gp_percentile_final',
-  aggtranstype => 'timestamptz' },
-{ aggfnoid => 'gp_percentile_disc(anyelement,float8,int8,int8)', aggkind => 
'n',
+  aggfinalfn => 'gp_percentile_final', aggtranstype => 'timestamptz' },
+{ aggfnoid => 'gp_percentile_disc(anyelement,float8,int8,int8)',
   aggtransfn => 'gp_percentile_disc_transition',
-  aggfinalfn => 'gp_percentile_final',
-  aggtranstype => 'anyelement' },
+  aggfinalfn => 'gp_percentile_final', aggtranstype => 'anyelement' },
 
 
 ]
diff --git a/src/include/catalog/pg_aggregate.h 
b/src/include/catalog/pg_aggregate.h
index 9bc41be353..7d3a791386 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -98,6 +98,9 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
 
        /* initial value for moving-agg state (can be NULL) */
        text            aggminitval BKI_DEFAULT(_null_);
+
+       /* is agg func safe to execute on replicated slices (GPDB specific) */
+       bool            aggrepsafeexec BKI_DEFAULT(f);
 #endif
 } FormData_pg_aggregate;
 
@@ -190,6 +193,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
                                                                         int32 
aggmTransSpace,
                                                                         const 
char *agginitval,
                                                                         const 
char *aggminitval,
+                                                                        bool 
aggrepsafeexec,
                                                                         char 
proparallel);
 
 #endif                                                 /* PG_AGGREGATE_H */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to