On 11/17/20 10:11 AM, tsunakawa.ta...@fujitsu.com wrote: > Hello, > > > Modified the patch as I talked with Tomas-san. The performance > results of loading one million records into a hash-partitioned table > with 8 partitions are as follows: > > unpatched, local: 8.6 seconds unpatched, fdw: 113.7 seconds patched, > fdw: 12.5 seconds (9x improvement) > > The test scripts are also attached. Run prepare.sql once to set up > tables and source data. Run local_part.sql and fdw_part.sql to load > source data into a partitioned table with local partitions and a > partitioned table with foreign tables respectively. >
Unfortunately, this does not compile for me, because nodeModifyTable calls ExecGetTouchedPartitions, which is not defined anywhere. Not sure what's that about, so I simply commented-out this. That probably fails the partitioned cases, but it allowed me to do some review and testing. As for the patch, I have a couple of comments 1) As I mentioned before, I really don't think we should be doing deparsing in execute_foreign_modify - that's something that should happen earlier, and should be in a deparse.c function. 2) I think the GUC should be replaced with an server/table option, similar to fetch_size. The attached patch tries to address both of these points. Firstly, it adds a new deparseBulkInsertSql function, that builds a query for the "full" batch, and then uses those two queries - when we get a full batch we use the bulk query, otherwise we use the single-row query in a loop. IMO this is cleaner than deparsing queries ad hoc in the execute_foreign_modify. Of course, this might be worse when we don't have a full batch, e.g. for a query that insert only 50 rows with batch_size=100. If this case is common, one option would be lowering the batch_size accordingly. If we really want to improve this case too, I suggest we pass more info than just a position of the VALUES clause - that seems a bit too hackish. Secondly, it adds the batch_size option to server/foreign table, and uses that. This is not complete, though. postgresPlanForeignModify currently passes a hard-coded value at the moment, it needs to lookup the correct value for the server/table from RelOptInfo or something. And I suppose ModifyTable inftractructure will need to determine the value in order to pass the correct number of slots to the FDW API. The are a couple other smaller changes. E.g. it undoes changes to finish_foreign_modify, and instead calls separate functions to prepare the bulk statement. It also adds list_make5/list_make6 macros, so as to not have to do strange stuff with the parameter lists. A finally, this should probably add a bunch of regression tests. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
>From 6a7031c800dff8fff9e1e64e0278494f3acd686f Mon Sep 17 00:00:00 2001 From: Takayuki Tsunakawa <tsunakawa.ta...@fujitsu.com> Date: Tue, 10 Nov 2020 09:27:56 +0900 Subject: [PATCH 1/3] Add bulk insert for foreign tables --- contrib/postgres_fdw/deparse.c | 3 +- contrib/postgres_fdw/postgres_fdw.c | 233 ++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 2 +- doc/src/sgml/config.sgml | 21 ++ doc/src/sgml/fdwhandler.sgml | 64 ++++- src/backend/executor/nodeModifyTable.c | 151 ++++++++++++ src/backend/utils/misc/guc.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/executor/nodeModifyTable.h | 2 + src/include/foreign/fdwapi.h | 7 + src/include/nodes/execnodes.h | 5 + 11 files changed, 446 insertions(+), 55 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 2d44df19fe..5aa81db08e 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs) + List **retrieved_attrs, int *values_end_len) { AttrNumber pindex; bool first; @@ -1749,6 +1749,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, } else appendStringInfoString(buf, " DEFAULT VALUES"); + *values_end_len = buf->len; if (doNothing) appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9c5aaacc51..f7be4bec17 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -86,8 +86,10 @@ enum FdwScanPrivateIndex * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Boolean flag showing if the remote query has a RETURNING clause - * 4) Integer list of attribute numbers retrieved by RETURNING, if any + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -95,6 +97,8 @@ enum FdwModifyPrivateIndex FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an integer Value node) */ + FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ @@ -175,7 +179,9 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ + int len; /* length of some part of query */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -184,6 +190,9 @@ typedef struct PgFdwModifyState int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* bulk operation stuff */ + int num_slots; /* number of slots to insert */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -342,6 +351,11 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBulkInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -428,20 +442,23 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs); -static TupleTableSlot *execute_foreign_modify(EState *estate, +static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot); + TupleTableSlot **slots, + int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); -static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -529,6 +546,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; @@ -1663,7 +1681,9 @@ postgresPlanForeignModify(PlannerInfo *root, List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; + List *retvalList; bool doNothing = false; + int values_end_len = -1; initStringInfo(&sql); @@ -1751,7 +1771,7 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1775,10 +1795,12 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + retvalList = list_make4(makeString(sql.data), targetAttrs, - makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + makeInteger(values_end_len), + makeInteger((retrieved_attrs != NIL))); + retvalList = lappend(retvalList, retrieved_attrs); + return retvalList; } /* @@ -1796,6 +1818,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, char *query; List *target_attrs; bool has_returning; + int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; @@ -1811,6 +1834,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, @@ -1828,6 +1853,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, + values_end_len, has_returning, retrieved_attrs); @@ -1845,7 +1871,37 @@ postgresExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - TupleTableSlot *rslot; + TupleTableSlot **rslot; + int numSlots = 1; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBulkInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBulkInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see @@ -1854,7 +1910,7 @@ postgresExecForeignInsert(EState *estate, if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slot, planSlot); + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -1872,8 +1928,13 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? *rslot : NULL; } /* @@ -1886,8 +1947,13 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? *rslot : NULL; } /* @@ -1905,7 +1971,7 @@ postgresEndForeignModify(EState *estate, return; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, true); } /* @@ -1924,6 +1990,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; + int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; @@ -2000,7 +2067,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2010,6 +2077,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, NULL, sql.data, targetAttrs, + values_end_len, retrieved_attrs != NIL, retrieved_attrs); @@ -2048,7 +2116,7 @@ postgresEndForeignInsert(EState *estate, fmstate = fmstate->aux_fmstate; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, true); } /* @@ -3538,6 +3606,7 @@ create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs) { @@ -3572,7 +3641,10 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + if (operation == CMD_INSERT) + fmstate->orig_query = pstrdup(fmstate->query); fmstate->target_attrs = target_attrs; + fmstate->len = len; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3624,6 +3696,8 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); + fmstate->num_slots = 1; + /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3634,26 +3708,75 @@ create_foreign_modify(EState *estate, * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, - * postgresExecForeignUpdate, and postgresExecForeignDelete.) + * postgresExecForeignBulkInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) */ -static TupleTableSlot * +static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot) + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; + int i, j; + int pindex; + bool first; + StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + finish_foreign_modify(fmstate, false); + + /* + * Recreate INSERT command string with numSlots records in its + * VALUES clause + */ + + /* Copy up to the end of the first record from the original query */ + initStringInfo(&sql); + appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len); + + /* Add records to VALUES clause */ + pindex = fmstate->p_nums + 1; + for (i = 0; i < *numSlots - 1; i++) + { + appendStringInfoString(&sql, ", ("); + + first = true; + for (j = 0; j < fmstate->p_nums; j++) + { + if (!first) + appendStringInfoString(&sql, ", "); + first = false; + + appendStringInfo(&sql, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(&sql, ')'); + } + + /* Copy stuff after VALUES clause from the original query */ + appendStringInfoString(&sql, fmstate->orig_query + fmstate->len); + + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3666,7 +3789,7 @@ execute_foreign_modify(EState *estate, Datum datum; bool isNull; - datum = ExecGetJunkAttribute(planSlot, + datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ @@ -3676,14 +3799,14 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, - fmstate->p_nums, + fmstate->p_nums * (*numSlots), p_values, NULL, NULL, @@ -3704,9 +3827,10 @@ execute_foreign_modify(EState *estate, /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { + Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) - store_returning_result(fmstate, slot, res); + store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); @@ -3716,10 +3840,12 @@ execute_foreign_modify(EState *estate, MemoryContextReset(fmstate->temp_cxt); + *numSlots = n_rows; + /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return (n_rows > 0) ? slots : NULL; } /* @@ -3779,19 +3905,23 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot) + TupleTableSlot **slots, + int numSlots) { const char **p_values; + int i; + int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { + Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); @@ -3799,32 +3929,37 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, } /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) + if (slots != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); - foreach(lc, fmstate->target_attrs) + for (i = 0; i < numSlots; i++) { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - value); - pindex++; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } } reset_transmission_modes(nestlevel); } - Assert(pindex == fmstate->p_nums); + Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); @@ -3873,7 +4008,8 @@ store_returning_result(PgFdwModifyState *fmstate, * Release resources for a foreign insert/update/delete operation */ static void -finish_foreign_modify(PgFdwModifyState *fmstate) +finish_foreign_modify(PgFdwModifyState *fmstate, + bool release_conn) { Assert(fmstate != NULL); @@ -3897,8 +4033,11 @@ finish_foreign_modify(PgFdwModifyState *fmstate) } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + if (release_conn) + { + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; + } } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..459a9ca6ab 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -161,7 +161,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs); + List **retrieved_attrs, int *values_end_len); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a632cf98ba..51bfe445b0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -8533,6 +8533,27 @@ SET XML OPTION { DOCUMENT | CONTENT }; </listitem> </varlistentry> + <varlistentry id="guc-max-bulk-insert-tuples" xreflabel="max_bulk_insert_tuples"> + <term><varname>max_bulk_insert_tuples</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_bulk_insert_tuples</varname></primary> + <secondary>configuration parameter</secondary> + </indexterm> + </term> + <listitem> + <para> + Sets the maximum number of tuples to accumulate and insert in bulk + into a foreign table. This applies to each partition when the insert + target is a partitioned table. + The valid range is <literal>1</literal>, which disables bulk insert, + to <literal>1000</literal>. + This takes effect only if the foreign data wrapper supports + bulk insert. + The default is <literal>100</literal>. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> <sect2 id="runtime-config-client-format"> diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 9c9293414c..cdf29595d7 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate, Begin executing a foreign table modification operation. This routine is called during executor startup. It should perform any initialization needed prior to the actual table modifications. Subsequently, - <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or - <function>ExecForeignDelete</function> will be called for each tuple to be + <function>ExecForeignInsert/ExecForeignBulkInsert</function>, + <function>ExecForeignUpdate</function> or + <function>ExecForeignDelete</function> will be called for tuple(s) to be inserted, updated, or deleted. </para> @@ -614,6 +615,56 @@ ExecForeignInsert(EState *estate, <para> <programlisting> +TupleTableSlot ** +ExecForeignBulkInsert(EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot *planSlots, + int *numSlots); +</programlisting> + + Insert multiple tuples in bulk into the foreign table. + The parameters are the same for <function>ExecForeignInsert</function> + except <literal>slots</literal> and <literal>planSlots</literal> contain + multiple tuples and <literal>*numSlots></literal> specifies the number of + tuples in those arrays. + </para> + + <para> + The return value is an array of slots containing the data that was + actually inserted (this might differ from the data supplied, for + example as a result of trigger actions.) + The passed-in <literal>slots</literal> can be re-used for this purpose. + The number of successfully inserted tuples is returned in + <literal>*numSlots</literal>. + </para> + + <para> + The data in the returned slot is used only if the <command>INSERT</command> + statement involves a view + <literal>WITH CHECK OPTION</literal>; or if the foreign table has + an <literal>AFTER ROW</literal> trigger. Triggers require all columns, + but the FDW could choose to optimize away returning some or all columns + depending on the contents of the + <literal>WITH CHECK OPTION</literal> constraints. + </para> + + <para> + If the <function>ExecForeignBulkInsert</function> pointer is set to + <literal>NULL</literal>, attempts to insert into the foreign table will + use <function>ExecForeignInsert</function>. + This function is not used if the <command>INSERT</command> has the + <literal>RETURNING></literal> clause. + </para> + + <para> + Note that this function is also called when inserting routed tuples into + a foreign-table partition. See the callback functions + described below that allow the FDW to support that. + </para> + + <para> +<programlisting> TupleTableSlot * ExecForeignUpdate(EState *estate, ResultRelInfo *rinfo, @@ -741,8 +792,9 @@ BeginForeignInsert(ModifyTableState *mtstate, in both cases when it is the partition chosen for tuple routing and the target specified in a <command>COPY FROM</command> command. It should perform any initialization needed prior to the actual insertion. - Subsequently, <function>ExecForeignInsert</function> will be called for - each tuple to be inserted into the foreign table. + Subsequently, <function>ExecForeignInsert</function> or + <function>ExecForeignBulkInsert</function> will be called for + tuple(s) to be inserted into the foreign table. </para> <para> @@ -773,8 +825,8 @@ BeginForeignInsert(ModifyTableState *mtstate, <para> Note that if the FDW does not support routable foreign-table partitions and/or executing <command>COPY FROM</command> on foreign tables, this - function or <function>ExecForeignInsert</function> subsequently called - must throw error as needed. + function or <function>ExecForeignInsert/ExecForeignBulkInsert</function> + subsequently called must throw error as needed. </para> <para> diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 29e07b7228..9c46036127 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -58,6 +58,15 @@ #include "utils/rel.h" +int max_bulk_insert_tuples; + +static void ExecBulkInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -389,6 +398,7 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + MemoryContext oldContext; /* * If the input result relation is a partitioned table, find the leaf @@ -441,6 +451,56 @@ ExecInsert(ModifyTableState *mtstate, ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_INSERT); + /* + * If the FDW supports bulk insert, accumulate tuples and insert them + * in bulk + */ + if (max_bulk_insert_tuples > 1 && + resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert && + resultRelInfo->ri_projectReturning == NULL) + { + /* + * If a certain number of tuples have already been accumulated, + * or a tuple has come for a different relation than that for + * the accumulated tuples, perform the bulk insert + */ + if (resultRelInfo->ri_NumSlots == max_bulk_insert_tuples) + { + ExecBulkInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + oldContext = MemoryContextSwitchTo(estate->es_query_cxt); + + if (resultRelInfo->ri_Slots == NULL) + { + resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) * + max_bulk_insert_tuples); + resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) * + max_bulk_insert_tuples); + } + + resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, + slot->tts_ops); + ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], + slot); + resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor, + planSlot->tts_ops); + ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], + planSlot); + + resultRelInfo->ri_NumSlots++; + + MemoryContextSwitchTo(oldContext); + return NULL; + } + /* * insert into foreign table: let the FDW do it */ @@ -701,6 +761,73 @@ ExecInsert(ModifyTableState *mtstate, return result; } +/* ---------------------------------------------------------------- + * ExecBulkInsert + * + * Insert multiple tuples in an efficient way. + * Currently, this handles inserting into a foreign table without + * RETURNING clause. + * ---------------------------------------------------------------- + */ +static void +ExecBulkInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag) +{ + int i; + int numInserted = numSlots; + TupleTableSlot *slot = NULL; + TupleTableSlot **rslots; + + /* + * insert into foreign table: let the FDW do it + */ + rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(estate, + resultRelInfo, + slots, + planSlots, + &numInserted); + + for (i = 0; i < numInserted; i++) + { + slot = rslots[i]; + + /* + * AFTER ROW Triggers or RETURNING expressions might reference the + * tableoid column, so (re-)initialize tts_tableOid before evaluating + * them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, NIL, + mtstate->mt_transition_capture); + + /* + * Check any WITH CHECK OPTION constraints from parent views. See the + * comment in ExecInsert. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); + } + + if (canSetTag && numInserted > 0) + { + estate->es_processed += numInserted; + setLastTid(&slot->tts_tid); + } + + for (i = 0; i < numSlots; i++) + { + ExecDropSingleTupleTableSlot(slots[i]); + ExecDropSingleTupleTableSlot(planSlots[i]); + } +} + /* ---------------------------------------------------------------- * ExecDelete * @@ -1940,6 +2067,9 @@ ExecModifyTable(PlanState *pstate) ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; + PartitionTupleRouting *proute = node->mt_partition_tuple_routing; + ResultRelInfo **resultRelInfos; + int num_partitions; CHECK_FOR_INTERRUPTS(); @@ -2155,6 +2285,27 @@ ExecModifyTable(PlanState *pstate) return slot; } + /* + * Insert remaining tuples for bulk insert. + */ + if (proute) + resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions); + else + { + resultRelInfos = &resultRelInfo; + num_partitions = 1; + } + for (int i = 0; i < num_partitions; i++) + { + resultRelInfo = resultRelInfos[i]; + if (resultRelInfo->ri_NumSlots > 0) + ExecBulkInsert(node, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, node->canSetTag); + } + /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index bb34630e8e..fdffbf97c0 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -47,6 +47,7 @@ #include "commands/vacuum.h" #include "commands/variable.h" #include "common/string.h" +#include "executor/nodeModifyTable.h" #include "funcapi.h" #include "jit/jit.h" #include "libpq/auth.h" @@ -3377,6 +3378,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_bulk_insert_tuples", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Sets the maximum number of tuples to insert in bulk into a foreign table."), + NULL, + 0 + }, + &max_bulk_insert_tuples, + 100, 1, 1000, + NULL, NULL, NULL + }, + { {"tcp_user_timeout", PGC_USERSET, CLIENT_CONN_OTHER, gettext_noop("TCP user timeout."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9cb571f7cc..9016f1f7bd 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -676,6 +676,7 @@ #xmloption = 'content' #gin_fuzzy_search_limit = 0 #gin_pending_list_limit = 4MB +#max_bulk_insert_tuples = 100 # - Locale and Formatting - diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 46a2dc9511..f082792a47 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -15,6 +15,8 @@ #include "nodes/execnodes.h" +extern int max_bulk_insert_tuples; + extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..c7eeff2257 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -85,6 +85,12 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef TupleTableSlot **(*ExecForeignBulkInsert_function) (EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); + typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, @@ -209,6 +215,7 @@ typedef struct FdwRoutine PlanForeignModify_function PlanForeignModify; BeginForeignModify_function BeginForeignModify; ExecForeignInsert_function ExecForeignInsert; + ExecForeignBulkInsert_function ExecForeignBulkInsert; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; EndForeignModify_function EndForeignModify; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6c0a7d68d6..3d67ded2ca 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -446,6 +446,11 @@ typedef struct ResultRelInfo /* true when modifying foreign table directly */ bool ri_usesFdwDirectModify; + /* bulk insert stuff */ + int ri_NumSlots; /* number of slots in the array */ + TupleTableSlot **ri_Slots; /* input tuples for bulk insert */ + TupleTableSlot **ri_PlanSlots; + /* list of WithCheckOption's to be checked */ List *ri_WithCheckOptions; -- 2.26.2
>From 845844a60936f750d77b02fce696fbb7179a1984 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Wed, 18 Nov 2020 01:15:49 +0100 Subject: [PATCH 2/3] make it compile --- src/backend/executor/nodeModifyTable.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 9c46036127..e20c613fed 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2289,7 +2289,9 @@ ExecModifyTable(PlanState *pstate) * Insert remaining tuples for bulk insert. */ if (proute) - resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions); + { + // resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions); + } else { resultRelInfos = &resultRelInfo; -- 2.26.2
>From 306c05fdc7ab7181a63b583095b42f1ddc9e6b05 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Wed, 18 Nov 2020 01:08:44 +0100 Subject: [PATCH 3/3] reworks --- contrib/postgres_fdw/deparse.c | 78 ++++- contrib/postgres_fdw/option.c | 14 + contrib/postgres_fdw/postgres_fdw.c | 389 ++++++++++++++++--------- contrib/postgres_fdw/postgres_fdw.h | 8 +- doc/src/sgml/config.sgml | 21 -- src/backend/executor/nodeModifyTable.c | 5 +- src/backend/nodes/list.c | 32 ++ src/include/nodes/pg_list.h | 30 ++ 8 files changed, 415 insertions(+), 162 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 5aa81db08e..3d08aae987 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs, int *values_end_len) + List **retrieved_attrs) { AttrNumber pindex; bool first; @@ -1749,7 +1749,81 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, } else appendStringInfoString(buf, " DEFAULT VALUES"); - *values_end_len = buf->len; + + if (doNothing) + appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); + + deparseReturningList(buf, rte, rtindex, rel, + rel->trigdesc && rel->trigdesc->trig_insert_after_row, + withCheckOptionList, returningList, retrieved_attrs); +} + +/* + * deparse remote bulk INSERT statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any), + * which is returned to *retrieved_attrs. + */ +void +deparseBulkInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize) +{ + AttrNumber pindex; + bool first; + ListCell *lc; + int i; + + appendStringInfoString(buf, "INSERT INTO "); + deparseRelation(buf, rel); + + + if (targetAttrs) + { + appendStringInfoChar(buf, '('); + + first = true; + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseColumnRef(buf, rtindex, attnum, rte, false); + } + + appendStringInfoString(buf, ") VALUES"); + + pindex = 1; + + for (i = 0; i < batchSize; i++) + { + if (i > 0) + appendStringInfoString(buf, ", "); + + appendStringInfoString(buf, "("); + + first = true; + foreach(lc, targetAttrs) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + appendStringInfo(buf, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(buf, ')'); + } + } + else + appendStringInfoString(buf, " DEFAULT VALUES"); if (doNothing) appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 1a03e02263..32bd4194eb 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) errmsg("%s requires a non-negative integer value", def->defname))); } + else if (strcmp(def->defname, "batch_size") == 0) + { + int batch_size; + + batch_size = strtol(defGetString(def), NULL, 10); + if (batch_size <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a non-negative integer value", + def->defname))); + } else if (strcmp(def->defname, "password_required") == 0) { bool pw_required = defGetBoolean(def); @@ -203,6 +214,9 @@ InitPgFdwOptions(void) /* fetch_size is available on both server and table */ {"fetch_size", ForeignServerRelationId, false}, {"fetch_size", ForeignTableRelationId, false}, + /* batch_size is available on both server and table */ + {"batch_size", ForeignServerRelationId, false}, + {"batch_size", ForeignTableRelationId, false}, {"password_required", UserMappingRelationId, false}, /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index f7be4bec17..4a78b21634 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -84,10 +84,9 @@ enum FdwScanPrivateIndex * a ModifyTable node referencing a postgres_fdw foreign table. We store: * * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server - * 2) Integer list of target attribute numbers for INSERT/UPDATE + * 2) bulk INSERT statement text to be sent to the remote server (or NIL) + * 3) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Length till the end of VALUES clause for INSERT - * (-1 for a DELETE/UPDATE) * 4) Boolean flag showing if the remote query has a RETURNING clause * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ @@ -95,14 +94,16 @@ enum FdwModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, + /* bulk SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateBulkUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, - /* Length till the end of VALUES clause (as an integer Value node) */ - FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ - FdwModifyPrivateRetrievedAttrs + FdwModifyPrivateRetrievedAttrs, + /* INSERT batch size (number of tuples sent at once) */ + FdwModifyPrivateBatchSize }; /* @@ -176,12 +177,12 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ + char *p_name_bulk; /* name of prepared statement, if created */ /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ - char *orig_query; /* original text of INSERT command */ + char *query_bulk; /* text of bulk INSERT command */ List *target_attrs; /* list of target attribute numbers */ - int len; /* length of some part of query */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -191,7 +192,7 @@ typedef struct PgFdwModifyState FmgrInfo *p_flinfo; /* output conversion functions for them */ /* bulk operation stuff */ - int num_slots; /* number of slots to insert */ + int batch_size; /* maximum number of rows to insert */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -441,10 +442,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *query_bulk, List *target_attrs, - int len, bool has_returning, - List *retrieved_attrs); + List *retrieved_attrs, + int batch_size); static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, @@ -452,13 +454,15 @@ static TupleTableSlot **execute_foreign_modify(EState *estate, TupleTableSlot **planSlots, int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); +static void prepare_foreign_modify_bulk(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); -static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn); +static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void deallocate_query(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -619,6 +623,7 @@ postgresGetForeignRelSize(PlannerInfo *root, fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; + fpinfo->batch_size = 100; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -1677,15 +1682,15 @@ postgresPlanForeignModify(PlannerInfo *root, RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; StringInfoData sql; + StringInfoData bulksql; List *targetAttrs = NIL; List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; - List *retvalList; bool doNothing = false; - int values_end_len = -1; initStringInfo(&sql); + initStringInfo(&bulksql); /* * Core code already has some lock on each rel being planned, so we can @@ -1771,7 +1776,11 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs, &values_end_len); + &retrieved_attrs); + deparseBulkInsertSql(&bulksql, rte, resultRelation, rel, + targetAttrs, doNothing, + withCheckOptionList, returningList, + &retrieved_attrs, 100); /* FIXME pass the batch size */ break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1795,12 +1804,12 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - retvalList = list_make4(makeString(sql.data), + return list_make6(makeString(sql.data), + makeString(bulksql.data), targetAttrs, - makeInteger(values_end_len), - makeInteger((retrieved_attrs != NIL))); - retvalList = lappend(retvalList, retrieved_attrs); - return retvalList; + makeInteger((retrieved_attrs != NIL)), + retrieved_attrs, + makeInteger(100)); } /* @@ -1816,10 +1825,11 @@ postgresBeginForeignModify(ModifyTableState *mtstate, { PgFdwModifyState *fmstate; char *query; + char *query_bulk; List *target_attrs; bool has_returning; - int values_end_len; List *retrieved_attrs; + int batch_size; RangeTblEntry *rte; /* @@ -1832,14 +1842,16 @@ postgresBeginForeignModify(ModifyTableState *mtstate, /* Deconstruct fdw_private data. */ query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); + query_bulk = strVal(list_nth(fdw_private, + FdwModifyPrivateBulkUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); - values_end_len = intVal(list_nth(fdw_private, - FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs); + batch_size = intVal(list_nth(fdw_private, + FdwModifyPrivateBatchSize)); /* Find RTE. */ rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, @@ -1852,10 +1864,11 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->operation, mtstate->mt_plans[subplan_index]->plan, query, + query_bulk, target_attrs, - values_end_len, has_returning, - retrieved_attrs); + retrieved_attrs, + batch_size); resultRelInfo->ri_FdwState = fmstate; } @@ -1971,7 +1984,7 @@ postgresEndForeignModify(EState *estate, return; /* Destroy the execution state */ - finish_foreign_modify(fmstate, true); + finish_foreign_modify(fmstate); } /* @@ -1990,8 +2003,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; - int values_end_len; StringInfoData sql; + StringInfoData bulksql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; @@ -2013,6 +2026,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RelationGetRelationName(rel)))); initStringInfo(&sql); + initStringInfo(&bulksql); /* We transmit all columns that are defined in the foreign table. */ for (attnum = 1; attnum <= tupdesc->natts; attnum++) @@ -2067,7 +2081,13 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs, &values_end_len); + &retrieved_attrs); + + /* FIXME only do this when batch size != 1 */ + deparseBulkInsertSql(&bulksql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, 100); /* FIXME set the batch size */ /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2076,10 +2096,11 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, CMD_INSERT, NULL, sql.data, + bulksql.data, targetAttrs, - values_end_len, retrieved_attrs != NIL, - retrieved_attrs); + retrieved_attrs, + 100); /* FIXME pass the correct batch_size */ /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -2116,7 +2137,7 @@ postgresEndForeignInsert(EState *estate, fmstate = fmstate->aux_fmstate; /* Destroy the execution state */ - finish_foreign_modify(fmstate, true); + finish_foreign_modify(fmstate); } /* @@ -3605,10 +3626,11 @@ create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *query_bulk, List *target_attrs, - int len, bool has_returning, - List *retrieved_attrs) + List *retrieved_attrs, + int batch_size) { PgFdwModifyState *fmstate; Relation rel = resultRelInfo->ri_RelationDesc; @@ -3641,10 +3663,8 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; - if (operation == CMD_INSERT) - fmstate->orig_query = pstrdup(fmstate->query); + fmstate->query_bulk = query_bulk; fmstate->target_attrs = target_attrs; - fmstate->len = len; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3696,7 +3716,7 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); - fmstate->num_slots = 1; + fmstate->batch_size = batch_size; /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3724,57 +3744,19 @@ execute_foreign_modify(EState *estate, const char **p_values; PGresult *res; int n_rows; - int i, j; - int pindex; - bool first; - StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); - if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + /* if we have a full insert batch, allocate the bulk prepared statement + * if needed */ + if (operation == CMD_INSERT) { - /* Destroy the prepared statement created previously */ - if (fmstate->p_name) - finish_foreign_modify(fmstate, false); - - /* - * Recreate INSERT command string with numSlots records in its - * VALUES clause - */ - - /* Copy up to the end of the first record from the original query */ - initStringInfo(&sql); - appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len); - - /* Add records to VALUES clause */ - pindex = fmstate->p_nums + 1; - for (i = 0; i < *numSlots - 1; i++) - { - appendStringInfoString(&sql, ", ("); - - first = true; - for (j = 0; j < fmstate->p_nums; j++) - { - if (!first) - appendStringInfoString(&sql, ", "); - first = false; - - appendStringInfo(&sql, "$%d", pindex); - pindex++; - } - - appendStringInfoChar(&sql, ')'); - } - - /* Copy stuff after VALUES clause from the original query */ - appendStringInfoString(&sql, fmstate->orig_query + fmstate->len); - - pfree(fmstate->query); - fmstate->query = sql.data; - fmstate->num_slots = *numSlots; + if ((fmstate->batch_size == *numSlots) && + (!fmstate->p_name_bulk)) + prepare_foreign_modify_bulk(fmstate); } /* Set up the prepared statement on the remote server, if we didn't yet */ @@ -3798,45 +3780,92 @@ execute_foreign_modify(EState *estate, ctid = (ItemPointer) DatumGetPointer(datum); } - /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); - /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums * (*numSlots), - p_values, - NULL, - NULL, - 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + if (fmstate->batch_size == *numSlots) + { + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); + + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name_bulk, + fmstate->p_nums * fmstate->batch_size, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); - /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); - if (PQresultStatus(res) != - (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - /* Check number of rows affected, and fetch RETURNING tuple if any */ - if (fmstate->has_returning) - { - Assert(*numSlots == 1); - n_rows = PQntuples(res); - if (n_rows > 0) - store_returning_result(fmstate, slots[0], res); + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + Assert(*numSlots == 1); + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slots[0], res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); } else - n_rows = atoi(PQcmdTuples(res)); + { + int i; - /* And clean up */ - PQclear(res); + for (i = 0; i < *numSlots; i++) + { + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, ctid, &slots[i], 1); + + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + Assert(*numSlots == 1); + n_rows += PQntuples(res); + if (PQntuples(res) > 0) + store_returning_result(fmstate, slots[i], res); + } + else + n_rows += atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + } + } MemoryContextReset(fmstate->temp_cxt); @@ -3848,6 +3877,51 @@ execute_foreign_modify(EState *estate, return (n_rows > 0) ? slots : NULL; } +/* + * prepare_foreign_modify + * Establish a prepared statement for execution of INSERT/UPDATE/DELETE + */ +static void +prepare_foreign_modify_bulk(PgFdwModifyState *fmstate) +{ + char prep_name[NAMEDATALEN]; + char *p_name; + PGresult *res; + + /* Construct name we'll use for the prepared statement. */ + snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name = pstrdup(prep_name); + + /* + * We intentionally do not specify parameter types here, but leave the + * remote server to derive them by default. This avoids possible problems + * with the remote server using different type OIDs than we do. All of + * the prepared statements we use in this module are simple enough that + * the remote server will make the right choices. + */ + if (!PQsendPrepare(fmstate->conn, + p_name, + fmstate->query_bulk, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query_bulk); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query_bulk); + PQclear(res); + + /* This action shows that the prepare has been done. */ + fmstate->p_name_bulk = p_name; +} + /* * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE @@ -4003,41 +4077,71 @@ store_returning_result(PgFdwModifyState *fmstate, PG_END_TRY(); } + +static void +deallocate_query(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; +} + + +static void +deallocate_query_bulk(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name_bulk) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name_bulk); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name_bulk = NULL; +} + + /* * finish_foreign_modify * Release resources for a foreign insert/update/delete operation */ static void -finish_foreign_modify(PgFdwModifyState *fmstate, - bool release_conn) +finish_foreign_modify(PgFdwModifyState *fmstate) { Assert(fmstate != NULL); - /* If we created a prepared statement, destroy it */ - if (fmstate->p_name) - { - char sql[64]; - PGresult *res; - - snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(fmstate->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); - PQclear(res); - fmstate->p_name = NULL; - } + /* If we created prepared statements, destroy them */ + deallocate_query(fmstate); + deallocate_query_bulk(fmstate); /* Release remote connection */ - if (release_conn) - { - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; - } + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; } /* @@ -5483,6 +5587,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "batch_size") == 0) + fpinfo->batch_size = strtol(defGetString(def), NULL, 10); } } @@ -5504,6 +5610,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "batch_size") == 0) + fpinfo->batch_size = strtol(defGetString(def), NULL, 10); } } @@ -5538,6 +5646,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->batch_size = fpinfo_o->batch_size; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -5559,6 +5668,12 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * XXX Not sure it's particularly useful to merge batch_size for + * join relations, but it it can't hurt. + */ + fpinfo->batch_size = Max(fpinfo_o->batch_size, fpinfo_i->batch_size); } } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 459a9ca6ab..ea2fdfb9c1 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo UserMapping *user; /* only set in use_remote_estimate mode */ int fetch_size; /* fetch size for this remote table */ + int batch_size; /* insert batch size for this remote table */ /* * Name of the relation, for use while EXPLAINing ForeignScan. It is used @@ -161,7 +162,12 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs, int *values_end_len); + List **retrieved_attrs); +extern void deparseBulkInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 51bfe445b0..a632cf98ba 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -8533,27 +8533,6 @@ SET XML OPTION { DOCUMENT | CONTENT }; </listitem> </varlistentry> - <varlistentry id="guc-max-bulk-insert-tuples" xreflabel="max_bulk_insert_tuples"> - <term><varname>max_bulk_insert_tuples</varname> (<type>integer</type>) - <indexterm> - <primary><varname>max_bulk_insert_tuples</varname></primary> - <secondary>configuration parameter</secondary> - </indexterm> - </term> - <listitem> - <para> - Sets the maximum number of tuples to accumulate and insert in bulk - into a foreign table. This applies to each partition when the insert - target is a partitioned table. - The valid range is <literal>1</literal>, which disables bulk insert, - to <literal>1000</literal>. - This takes effect only if the foreign data wrapper supports - bulk insert. - The default is <literal>100</literal>. - </para> - </listitem> - </varlistentry> - </variablelist> </sect2> <sect2 id="runtime-config-client-format"> diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e20c613fed..4932014515 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -57,7 +57,10 @@ #include "utils/memutils.h" #include "utils/rel.h" - +/* + * FIXME this should be removed / replaced with the foreign server/table + * batch_size option. + */ int max_bulk_insert_tuples; static void ExecBulkInsert(ModifyTableState *mtstate, diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index efa44342c4..56c63ba2a3 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -277,6 +277,38 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, return list; } +List * +list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, ListCell datum5) +{ + List *list = new_list(t, 5); + + list->elements[0] = datum1; + list->elements[1] = datum2; + list->elements[2] = datum3; + list->elements[3] = datum4; + list->elements[4] = datum5; + check_list_invariants(list); + return list; +} + +List * +list_make6_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, ListCell datum5, + ListCell datum6) +{ + List *list = new_list(t, 6); + + list->elements[0] = datum1; + list->elements[1] = datum2; + list->elements[2] = datum3; + list->elements[3] = datum4; + list->elements[4] = datum5; + list->elements[5] = datum6; + check_list_invariants(list); + return list; +} + /* * Make room for a new head cell in the given (non-NIL) list. * diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index cda77a841e..195a8c1818 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -213,6 +213,14 @@ list_length(const List *l) #define list_make4(x1,x2,x3,x4) \ list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ list_make_ptr_cell(x3), list_make_ptr_cell(x4)) +#define list_make5(x1,x2,x3,x4,x5) \ + list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ + list_make_ptr_cell(x3), list_make_ptr_cell(x4), \ + list_make_ptr_cell(x5)) +#define list_make6(x1,x2,x3,x4,x5,x6) \ + list_make6_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ + list_make_ptr_cell(x3), list_make_ptr_cell(x4), \ + list_make_ptr_cell(x5), list_make_ptr_cell(x6)) #define list_make1_int(x1) \ list_make1_impl(T_IntList, list_make_int_cell(x1)) @@ -224,6 +232,14 @@ list_length(const List *l) #define list_make4_int(x1,x2,x3,x4) \ list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ list_make_int_cell(x3), list_make_int_cell(x4)) +#define list_make5_int(x1,x2,x3,x4,x5) \ + list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ + list_make_int_cell(x3), list_make_int_cell(x4), \ + list_make_int_cell(x5)) +#define list_make6_int(x1,x2,x3,x4,x5,x6) \ + list_make6_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ + list_make_int_cell(x3), list_make_int_cell(x4), \ + list_make_int_cell(x5), list_make_int_cell(x6)) #define list_make1_oid(x1) \ list_make1_impl(T_OidList, list_make_oid_cell(x1)) @@ -235,6 +251,14 @@ list_length(const List *l) #define list_make4_oid(x1,x2,x3,x4) \ list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ list_make_oid_cell(x3), list_make_oid_cell(x4)) +#define list_make5_oid(x1,x2,x3,x4,x5) \ + list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ + list_make_oid_cell(x3), list_make_oid_cell(x4), \ + list_make_oid_cell(x5)) +#define list_make6_oid(x1,x2,x3,x4,x5,x6) \ + list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ + list_make_oid_cell(x3), list_make_oid_cell(x4), \ + list_make_oid_cell(x5), list_make_oid_cell(x6)) /* * Locate the n'th cell (counting from 0) of the list. @@ -520,6 +544,12 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3); extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3, ListCell datum4); +extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, + ListCell datum5); +extern List *list_make6_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, + ListCell datum5, ListCell datum6); extern pg_nodiscard List *lappend(List *list, void *datum); extern pg_nodiscard List *lappend_int(List *list, int datum); -- 2.26.2