On 11/17/20 10:11 AM, tsunakawa.ta...@fujitsu.com wrote:
> Hello,
> 
> 
> Modified the patch as I talked with Tomas-san.  The performance
> results of loading one million records into a hash-partitioned table
> with 8 partitions are as follows:
> 
> unpatched, local: 8.6 seconds unpatched, fdw: 113.7 seconds patched,
> fdw: 12.5 seconds  (9x improvement)
> 
> The test scripts are also attached.  Run prepare.sql once to set up
> tables and source data.  Run local_part.sql and fdw_part.sql to load
> source data into a partitioned table with local partitions and a
> partitioned table with foreign tables respectively.
> 

Unfortunately, this does not compile for me, because nodeModifyTable
calls ExecGetTouchedPartitions, which is not defined anywhere. Not sure
what's that about, so I simply commented-out this. That probably fails
the partitioned cases, but it allowed me to do some review and testing.

As for the patch, I have a couple of comments

1) As I mentioned before, I really don't think we should be doing
deparsing in execute_foreign_modify - that's something that should
happen earlier, and should be in a deparse.c function.

2) I think the GUC should be replaced with an server/table option,
similar to fetch_size.

The attached patch tries to address both of these points.

Firstly, it adds a new deparseBulkInsertSql function, that builds a
query for the "full" batch, and then uses those two queries - when we
get a full batch we use the bulk query, otherwise we use the single-row
query in a loop. IMO this is cleaner than deparsing queries ad hoc in
the execute_foreign_modify.

Of course, this might be worse when we don't have a full batch, e.g. for
a query that insert only 50 rows with batch_size=100. If this case is
common, one option would be lowering the batch_size accordingly. If we
really want to improve this case too, I suggest we pass more info than
just a position of the VALUES clause - that seems a bit too hackish.


Secondly, it adds the batch_size option to server/foreign table, and
uses that. This is not complete, though. postgresPlanForeignModify
currently passes a hard-coded value at the moment, it needs to lookup
the correct value for the server/table from RelOptInfo or something. And
I suppose ModifyTable inftractructure will need to determine the value
in order to pass the correct number of slots to the FDW API.

The are a couple other smaller changes. E.g. it undoes changes to
finish_foreign_modify, and instead calls separate functions to prepare
the bulk statement. It also adds list_make5/list_make6 macros, so as to
not have to do strange stuff with the parameter lists.


A finally, this should probably add a bunch of regression tests.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 6a7031c800dff8fff9e1e64e0278494f3acd686f Mon Sep 17 00:00:00 2001
From: Takayuki Tsunakawa <tsunakawa.ta...@fujitsu.com>
Date: Tue, 10 Nov 2020 09:27:56 +0900
Subject: [PATCH 1/3] Add bulk insert for foreign tables

---
 contrib/postgres_fdw/deparse.c                |   3 +-
 contrib/postgres_fdw/postgres_fdw.c           | 233 ++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h           |   2 +-
 doc/src/sgml/config.sgml                      |  21 ++
 doc/src/sgml/fdwhandler.sgml                  |  64 ++++-
 src/backend/executor/nodeModifyTable.c        | 151 ++++++++++++
 src/backend/utils/misc/guc.c                  |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/executor/nodeModifyTable.h        |   2 +
 src/include/foreign/fdwapi.h                  |   7 +
 src/include/nodes/execnodes.h                 |   5 +
 11 files changed, 446 insertions(+), 55 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 2d44df19fe..5aa81db08e 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, bool doNothing,
 				 List *withCheckOptionList, List *returningList,
-				 List **retrieved_attrs)
+				 List **retrieved_attrs, int *values_end_len)
 {
 	AttrNumber	pindex;
 	bool		first;
@@ -1749,6 +1749,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 	}
 	else
 		appendStringInfoString(buf, " DEFAULT VALUES");
+	*values_end_len = buf->len;
 
 	if (doNothing)
 		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9c5aaacc51..f7be4bec17 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -86,8 +86,10 @@ enum FdwScanPrivateIndex
  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
  * 2) Integer list of target attribute numbers for INSERT/UPDATE
  *	  (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ *	  (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
  */
 enum FdwModifyPrivateIndex
 {
@@ -95,6 +97,8 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateUpdateSql,
 	/* Integer list of target attribute numbers for INSERT/UPDATE */
 	FdwModifyPrivateTargetAttnums,
+	/* Length till the end of VALUES clause (as an integer Value node) */
+	FdwModifyPrivateLen,
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
@@ -175,7 +179,9 @@ typedef struct PgFdwModifyState
 
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
+	char	   *orig_query;		/* original text of INSERT command */
 	List	   *target_attrs;	/* list of target attribute numbers */
+	int			len;			/* length of some part of query */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 
@@ -184,6 +190,9 @@ typedef struct PgFdwModifyState
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
+	/* bulk operation stuff */
+	int			num_slots;		/* number of slots to insert */
+
 	/* working memory context */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
@@ -342,6 +351,11 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBulkInsert(EState *estate,
+												 ResultRelInfo *resultRelInfo,
+												 TupleTableSlot **slots,
+												 TupleTableSlot **planSlots,
+												 int *numSlots);
 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
@@ -428,20 +442,23 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   Plan *subplan,
 											   char *query,
 											   List *target_attrs,
+											   int len,
 											   bool has_returning,
 											   List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  ResultRelInfo *resultRelInfo,
 											  CmdType operation,
-											  TupleTableSlot *slot,
-											  TupleTableSlot *planSlot);
+											  TupleTableSlot **slots,
+											  TupleTableSlot **planSlots,
+											  int *numSlots);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
-											 TupleTableSlot *slot);
+											 TupleTableSlot **slots,
+											 int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
-static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -529,6 +546,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->PlanForeignModify = postgresPlanForeignModify;
 	routine->BeginForeignModify = postgresBeginForeignModify;
 	routine->ExecForeignInsert = postgresExecForeignInsert;
+	routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
 	routine->EndForeignModify = postgresEndForeignModify;
@@ -1663,7 +1681,9 @@ postgresPlanForeignModify(PlannerInfo *root,
 	List	   *withCheckOptionList = NIL;
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
+	List	   *retvalList;
 	bool		doNothing = false;
+	int			values_end_len = -1;
 
 	initStringInfo(&sql);
 
@@ -1751,7 +1771,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 			deparseInsertSql(&sql, rte, resultRelation, rel,
 							 targetAttrs, doNothing,
 							 withCheckOptionList, returningList,
-							 &retrieved_attrs);
+							 &retrieved_attrs, &values_end_len);
 			break;
 		case CMD_UPDATE:
 			deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1775,10 +1795,12 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	return list_make4(makeString(sql.data),
+	retvalList = list_make4(makeString(sql.data),
 					  targetAttrs,
-					  makeInteger((retrieved_attrs != NIL)),
-					  retrieved_attrs);
+					  makeInteger(values_end_len),
+					  makeInteger((retrieved_attrs != NIL)));
+	retvalList = lappend(retvalList, retrieved_attrs);
+	return retvalList;
 }
 
 /*
@@ -1796,6 +1818,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	char	   *query;
 	List	   *target_attrs;
 	bool		has_returning;
+	int			values_end_len;
 	List	   *retrieved_attrs;
 	RangeTblEntry *rte;
 
@@ -1811,6 +1834,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 							FdwModifyPrivateUpdateSql));
 	target_attrs = (List *) list_nth(fdw_private,
 									 FdwModifyPrivateTargetAttnums);
+	values_end_len = intVal(list_nth(fdw_private,
+									FdwModifyPrivateLen));
 	has_returning = intVal(list_nth(fdw_private,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1828,6 +1853,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									mtstate->mt_plans[subplan_index]->plan,
 									query,
 									target_attrs,
+									values_end_len,
 									has_returning,
 									retrieved_attrs);
 
@@ -1845,7 +1871,37 @@ postgresExecForeignInsert(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	TupleTableSlot *rslot;
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	/*
+	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+	 * postgresBeginForeignInsert())
+	 */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+								   &slot, &planSlot, &numSlots);
+	/* Revert that change */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate;
+
+	return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBulkInsert
+ *		Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBulkInsert(EState *estate,
+						  ResultRelInfo *resultRelInfo,
+						  TupleTableSlot **slots,
+						  TupleTableSlot **planSlots,
+						  int *numSlots)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	TupleTableSlot **rslot;
 
 	/*
 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1854,7 +1910,7 @@ postgresExecForeignInsert(EState *estate,
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-								   slot, planSlot);
+								   slots, planSlots, numSlots);
 	/* Revert that change */
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate;
@@ -1872,8 +1928,13 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? *rslot : NULL;
 }
 
 /*
@@ -1886,8 +1947,13 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? *rslot : NULL;
 }
 
 /*
@@ -1905,7 +1971,7 @@ postgresEndForeignModify(EState *estate,
 		return;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate);
+	finish_foreign_modify(fmstate, true);
 }
 
 /*
@@ -1924,6 +1990,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	RangeTblEntry *rte;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	int			attnum;
+	int			values_end_len;
 	StringInfoData sql;
 	List	   *targetAttrs = NIL;
 	List	   *retrieved_attrs = NIL;
@@ -2000,7 +2067,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
 					 resultRelInfo->ri_WithCheckOptions,
 					 resultRelInfo->ri_returningList,
-					 &retrieved_attrs);
+					 &retrieved_attrs, &values_end_len);
 
 	/* Construct an execution state. */
 	fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2010,6 +2077,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									NULL,
 									sql.data,
 									targetAttrs,
+									values_end_len,
 									retrieved_attrs != NIL,
 									retrieved_attrs);
 
@@ -2048,7 +2116,7 @@ postgresEndForeignInsert(EState *estate,
 		fmstate = fmstate->aux_fmstate;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate);
+	finish_foreign_modify(fmstate, true);
 }
 
 /*
@@ -3538,6 +3606,7 @@ create_foreign_modify(EState *estate,
 					  Plan *subplan,
 					  char *query,
 					  List *target_attrs,
+					  int len,
 					  bool has_returning,
 					  List *retrieved_attrs)
 {
@@ -3572,7 +3641,10 @@ create_foreign_modify(EState *estate,
 
 	/* Set up remote query information. */
 	fmstate->query = query;
+	if (operation == CMD_INSERT)
+		fmstate->orig_query = pstrdup(fmstate->query);
 	fmstate->target_attrs = target_attrs;
+	fmstate->len = len;
 	fmstate->has_returning = has_returning;
 	fmstate->retrieved_attrs = retrieved_attrs;
 
@@ -3624,6 +3696,8 @@ create_foreign_modify(EState *estate,
 
 	Assert(fmstate->p_nums <= n_params);
 
+	fmstate->num_slots = 1;
+
 	/* Initialize auxiliary state */
 	fmstate->aux_fmstate = NULL;
 
@@ -3634,26 +3708,75 @@ create_foreign_modify(EState *estate,
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
- *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ *		postgresExecForeignBulkInsert, postgresExecForeignUpdate, and
+ *		postgresExecForeignDelete.)
  */
-static TupleTableSlot *
+static TupleTableSlot **
 execute_foreign_modify(EState *estate,
 					   ResultRelInfo *resultRelInfo,
 					   CmdType operation,
-					   TupleTableSlot *slot,
-					   TupleTableSlot *planSlot)
+					   TupleTableSlot **slots,
+					   TupleTableSlot **planSlots,
+					   int *numSlots)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
 	ItemPointer ctid = NULL;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	int			i, j;
+	int			pindex;
+	bool		first;
+	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
 	Assert(operation == CMD_INSERT ||
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+	{
+		/* Destroy the prepared statement created previously */
+		if (fmstate->p_name)
+			finish_foreign_modify(fmstate, false);
+
+		/*
+		 * Recreate INSERT command string with numSlots records in its
+		 * VALUES clause
+		 */
+
+		/* Copy up to the end of the first record from the original query */
+		initStringInfo(&sql);
+		appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len);
+
+		/* Add records to VALUES clause */
+		pindex = fmstate->p_nums + 1;
+		for (i = 0; i < *numSlots - 1; i++)
+		{
+			appendStringInfoString(&sql, ", (");
+
+			first = true;
+			for (j = 0; j < fmstate->p_nums; j++)
+			{
+				if (!first)
+					appendStringInfoString(&sql, ", ");
+				first = false;
+
+				appendStringInfo(&sql, "$%d", pindex);
+				pindex++;
+			}
+
+			appendStringInfoChar(&sql, ')');
+		}
+
+		/* Copy stuff after VALUES clause from the original query */
+		appendStringInfoString(&sql, fmstate->orig_query + fmstate->len);
+
+		pfree(fmstate->query);
+		fmstate->query = sql.data;
+		fmstate->num_slots = *numSlots;
+	}
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -3666,7 +3789,7 @@ execute_foreign_modify(EState *estate,
 		Datum		datum;
 		bool		isNull;
 
-		datum = ExecGetJunkAttribute(planSlot,
+		datum = ExecGetJunkAttribute(planSlots[0],
 									 fmstate->ctidAttno,
 									 &isNull);
 		/* shouldn't ever get a null result... */
@@ -3676,14 +3799,14 @@ execute_foreign_modify(EState *estate,
 	}
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
 
 	/*
 	 * Execute the prepared statement.
 	 */
 	if (!PQsendQueryPrepared(fmstate->conn,
 							 fmstate->p_name,
-							 fmstate->p_nums,
+							 fmstate->p_nums * (*numSlots),
 							 p_values,
 							 NULL,
 							 NULL,
@@ -3704,9 +3827,10 @@ execute_foreign_modify(EState *estate,
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
 	{
+		Assert(*numSlots == 1);
 		n_rows = PQntuples(res);
 		if (n_rows > 0)
-			store_returning_result(fmstate, slot, res);
+			store_returning_result(fmstate, slots[0], res);
 	}
 	else
 		n_rows = atoi(PQcmdTuples(res));
@@ -3716,10 +3840,12 @@ execute_foreign_modify(EState *estate,
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	*numSlots = n_rows;
+
 	/*
 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
 	 */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows > 0) ? slots : NULL;
 }
 
 /*
@@ -3779,19 +3905,23 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
-						 TupleTableSlot *slot)
+						 TupleTableSlot **slots,
+						 int numSlots)
 {
 	const char **p_values;
+	int			i;
+	int			j;
 	int			pindex = 0;
 	MemoryContext oldcontext;
 
 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
 
-	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
 
 	/* 1st parameter should be ctid, if it's in use */
 	if (tupleid != NULL)
 	{
+		Assert(numSlots == 1);
 		/* don't need set_transmission_modes for TID output */
 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
 											  PointerGetDatum(tupleid));
@@ -3799,32 +3929,37 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 	}
 
 	/* get following parameters from slot */
-	if (slot != NULL && fmstate->target_attrs != NIL)
+	if (slots != NULL && fmstate->target_attrs != NIL)
 	{
 		int			nestlevel;
 		ListCell   *lc;
 
 		nestlevel = set_transmission_modes();
 
-		foreach(lc, fmstate->target_attrs)
+		for (i = 0; i < numSlots; i++)
 		{
-			int			attnum = lfirst_int(lc);
-			Datum		value;
-			bool		isnull;
+			j = (tupleid != NULL) ? 1 : 0;
+			foreach(lc, fmstate->target_attrs)
+			{
+				int			attnum = lfirst_int(lc);
+				Datum		value;
+				bool		isnull;
 
-			value = slot_getattr(slot, attnum, &isnull);
-			if (isnull)
-				p_values[pindex] = NULL;
-			else
-				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
-													  value);
-			pindex++;
+				value = slot_getattr(slots[i], attnum, &isnull);
+				if (isnull)
+					p_values[pindex] = NULL;
+				else
+					p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+														  value);
+				pindex++;
+				j++;
+			}
 		}
 
 		reset_transmission_modes(nestlevel);
 	}
 
-	Assert(pindex == fmstate->p_nums);
+	Assert(pindex == fmstate->p_nums * numSlots);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -3873,7 +4008,8 @@ store_returning_result(PgFdwModifyState *fmstate,
  *		Release resources for a foreign insert/update/delete operation
  */
 static void
-finish_foreign_modify(PgFdwModifyState *fmstate)
+finish_foreign_modify(PgFdwModifyState *fmstate,
+	bool release_conn)
 {
 	Assert(fmstate != NULL);
 
@@ -3897,8 +4033,11 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	if (release_conn)
+	{
+		ReleaseConnection(fmstate->conn);
+		fmstate->conn = NULL;
+	}
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..459a9ca6ab 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -161,7 +161,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
-							 List **retrieved_attrs);
+							 List **retrieved_attrs, int *values_end_len);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a632cf98ba..51bfe445b0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -8533,6 +8533,27 @@ SET XML OPTION { DOCUMENT | CONTENT };
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-bulk-insert-tuples" xreflabel="max_bulk_insert_tuples">
+      <term><varname>max_bulk_insert_tuples</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_bulk_insert_tuples</varname></primary>
+       <secondary>configuration parameter</secondary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the maximum number of tuples to accumulate and insert in bulk
+        into a foreign table. This applies to each partition when the insert
+        target is a partitioned table.
+        The valid range is <literal>1</literal>, which disables bulk insert,
+        to <literal>1000</literal>.
+        This takes effect only if the foreign data wrapper supports
+        bulk insert.
+        The default is <literal>100</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
      <sect2 id="runtime-config-client-format">
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c9293414c..cdf29595d7 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
      Begin executing a foreign table modification operation.  This routine is
      called during executor startup.  It should perform any initialization
      needed prior to the actual table modifications.  Subsequently,
-     <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
-     <function>ExecForeignDelete</function> will be called for each tuple to be
+     <function>ExecForeignInsert/ExecForeignBulkInsert</function>,
+     <function>ExecForeignUpdate</function> or
+     <function>ExecForeignDelete</function> will be called for tuple(s) to be
      inserted, updated, or deleted.
     </para>
 
@@ -614,6 +615,56 @@ ExecForeignInsert(EState *estate,
 
     <para>
 <programlisting>
+TupleTableSlot **
+ExecForeignBulkInsert(EState *estate,
+                  ResultRelInfo *rinfo,
+                  TupleTableSlot **slots,
+                  TupleTableSlot *planSlots,
+                  int *numSlots);
+</programlisting>
+
+     Insert multiple tuples in bulk into the foreign table.
+     The parameters are the same for <function>ExecForeignInsert</function>
+     except <literal>slots</literal> and <literal>planSlots</literal> contain
+     multiple tuples and <literal>*numSlots></literal> specifies the number of
+     tuples in those arrays.
+    </para>
+
+    <para>
+     The return value is an array of slots containing the data that was
+     actually inserted (this might differ from the data supplied, for
+     example as a result of trigger actions.)
+     The passed-in <literal>slots</literal> can be re-used for this purpose.
+     The number of successfully inserted tuples is returned in
+     <literal>*numSlots</literal>.
+    </para>
+
+    <para>
+     The data in the returned slot is used only if the <command>INSERT</command>
+     statement involves a view
+     <literal>WITH CHECK OPTION</literal>; or if the foreign table has
+     an <literal>AFTER ROW</literal> trigger.  Triggers require all columns,
+     but the FDW could choose to optimize away returning some or all columns
+     depending on the contents of the
+     <literal>WITH CHECK OPTION</literal> constraints.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBulkInsert</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+     This function is not used if the <command>INSERT</command> has the
+     <literal>RETURNING></literal> clause.
+    </para>
+
+    <para>
+     Note that this function is also called when inserting routed tuples into
+     a foreign-table partition.  See the callback functions
+     described below that allow the FDW to support that.
+    </para>
+
+    <para>
+<programlisting>
 TupleTableSlot *
 ExecForeignUpdate(EState *estate,
                   ResultRelInfo *rinfo,
@@ -741,8 +792,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
      in both cases when it is the partition chosen for tuple routing and the
      target specified in a <command>COPY FROM</command> command.  It should
      perform any initialization needed prior to the actual insertion.
-     Subsequently, <function>ExecForeignInsert</function> will be called for
-     each tuple to be inserted into the foreign table.
+     Subsequently, <function>ExecForeignInsert</function> or
+     <function>ExecForeignBulkInsert</function> will be called for
+     tuple(s) to be inserted into the foreign table.
     </para>
 
     <para>
@@ -773,8 +825,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
     <para>
      Note that if the FDW does not support routable foreign-table partitions
      and/or executing <command>COPY FROM</command> on foreign tables, this
-     function or <function>ExecForeignInsert</function> subsequently called
-     must throw error as needed.
+     function or <function>ExecForeignInsert/ExecForeignBulkInsert</function>
+     subsequently called must throw error as needed.
     </para>
 
     <para>
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 29e07b7228..9c46036127 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,15 @@
 #include "utils/rel.h"
 
 
+int max_bulk_insert_tuples;
+
+static void ExecBulkInsert(ModifyTableState *mtstate,
+								 ResultRelInfo *resultRelInfo,
+								 TupleTableSlot **slots,
+								 TupleTableSlot **planSlots,
+								 int numSlots,
+								 EState *estate,
+								 bool canSetTag);
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
 								 ResultRelInfo *resultRelInfo,
 								 ItemPointer conflictTid,
@@ -389,6 +398,7 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	MemoryContext oldContext;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -441,6 +451,56 @@ ExecInsert(ModifyTableState *mtstate,
 			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
 									   CMD_INSERT);
 
+		/*
+		 * If the FDW supports bulk insert, accumulate tuples and insert them
+		 * in bulk
+		 */
+		if (max_bulk_insert_tuples > 1 &&
+			resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert &&
+			resultRelInfo->ri_projectReturning == NULL)
+		{
+			/*
+			 * If a certain number of tuples have already been accumulated,
+			 * or	a tuple has come for a different relation than that for
+			 * the accumulated tuples, perform the bulk insert
+			 */
+			if (resultRelInfo->ri_NumSlots == max_bulk_insert_tuples)
+			{
+				ExecBulkInsert(mtstate, resultRelInfo,
+							   resultRelInfo->ri_Slots,
+							   resultRelInfo->ri_PlanSlots,
+							   resultRelInfo->ri_NumSlots,
+							   estate, canSetTag);
+				resultRelInfo->ri_NumSlots = 0;
+			}
+
+			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+			if (resultRelInfo->ri_Slots == NULL)
+			{
+				resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+										   max_bulk_insert_tuples);
+				resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+										   max_bulk_insert_tuples);
+			}
+
+			resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+										 slot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+						 slot);
+			resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+										 planSlot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+						 planSlot);
+
+			resultRelInfo->ri_NumSlots++;
+
+			MemoryContextSwitchTo(oldContext);
+			return NULL;
+		}
+
 		/*
 		 * insert into foreign table: let the FDW do it
 		 */
@@ -701,6 +761,73 @@ ExecInsert(ModifyTableState *mtstate,
 	return result;
 }
 
+/* ----------------------------------------------------------------
+ *		ExecBulkInsert
+ *
+ *		Insert multiple tuples in an efficient way.
+ *		Currently, this handles inserting into a foreign table without
+ *		RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBulkInsert(ModifyTableState *mtstate,
+		   ResultRelInfo *resultRelInfo,
+		   TupleTableSlot **slots,
+		   TupleTableSlot **planSlots,
+		   int numSlots,
+		   EState *estate,
+		   bool canSetTag)
+{
+	int			i;
+	int			numInserted = numSlots;
+	TupleTableSlot *slot = NULL;
+	TupleTableSlot **rslots;
+
+	/*
+	 * insert into foreign table: let the FDW do it
+	 */
+	rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(estate,
+																 resultRelInfo,
+																 slots,
+																 planSlots,
+																 &numInserted);
+
+	for (i = 0; i < numInserted; i++)
+	{
+		slot = rslots[i];
+
+		/*
+		 * AFTER ROW Triggers or RETURNING expressions might reference the
+		 * tableoid column, so (re-)initialize tts_tableOid before evaluating
+		 * them.
+		 */
+		slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+							 mtstate->mt_transition_capture);
+
+		/*
+		 * Check any WITH CHECK OPTION constraints from parent views.  See the
+		 * comment in ExecInsert.
+		 */
+		if (resultRelInfo->ri_WithCheckOptions != NIL)
+			ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+	}
+
+	if (canSetTag && numInserted > 0)
+	{
+		estate->es_processed += numInserted;
+		setLastTid(&slot->tts_tid);
+	}
+
+	for (i = 0; i < numSlots; i++)
+	{
+		ExecDropSingleTupleTableSlot(slots[i]);
+		ExecDropSingleTupleTableSlot(planSlots[i]);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecDelete
  *
@@ -1940,6 +2067,9 @@ ExecModifyTable(PlanState *pstate)
 	ItemPointerData tuple_ctid;
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
+	PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+	ResultRelInfo **resultRelInfos;
+	int			num_partitions;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -2155,6 +2285,27 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	/*
+	 * Insert remaining tuples for bulk insert.
+	 */
+	if (proute)
+		resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions);
+	else
+	{
+		resultRelInfos = &resultRelInfo;
+		num_partitions = 1;
+	}
+	for (int i = 0; i < num_partitions; i++)
+	{
+		resultRelInfo = resultRelInfos[i];
+		if (resultRelInfo->ri_NumSlots > 0)
+			ExecBulkInsert(node, resultRelInfo,
+						   resultRelInfo->ri_Slots,
+						   resultRelInfo->ri_PlanSlots,
+						   resultRelInfo->ri_NumSlots,
+						   estate, node->canSetTag);
+	}
+
 	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630e8e..fdffbf97c0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -47,6 +47,7 @@
 #include "commands/vacuum.h"
 #include "commands/variable.h"
 #include "common/string.h"
+#include "executor/nodeModifyTable.h"
 #include "funcapi.h"
 #include "jit/jit.h"
 #include "libpq/auth.h"
@@ -3377,6 +3378,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_bulk_insert_tuples", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the maximum number of tuples to insert in bulk into a foreign table."),
+			NULL,
+			0
+		},
+		&max_bulk_insert_tuples,
+		100, 1, 1000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"tcp_user_timeout", PGC_USERSET, CLIENT_CONN_OTHER,
 			gettext_noop("TCP user timeout."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 9cb571f7cc..9016f1f7bd 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -676,6 +676,7 @@
 #xmloption = 'content'
 #gin_fuzzy_search_limit = 0
 #gin_pending_list_limit = 4MB
+#max_bulk_insert_tuples = 100
 
 # - Locale and Formatting -
 
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 46a2dc9511..f082792a47 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -15,6 +15,8 @@
 
 #include "nodes/execnodes.h"
 
+extern int max_bulk_insert_tuples;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..c7eeff2257 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,12 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
 
+typedef TupleTableSlot **(*ExecForeignBulkInsert_function) (EState *estate,
+													   ResultRelInfo *rinfo,
+													   TupleTableSlot **slots,
+													   TupleTableSlot **planSlots,
+													   int *numSlots);
+
 typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
@@ -209,6 +215,7 @@ typedef struct FdwRoutine
 	PlanForeignModify_function PlanForeignModify;
 	BeginForeignModify_function BeginForeignModify;
 	ExecForeignInsert_function ExecForeignInsert;
+	ExecForeignBulkInsert_function ExecForeignBulkInsert;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
 	EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d68d6..3d67ded2ca 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -446,6 +446,11 @@ typedef struct ResultRelInfo
 	/* true when modifying foreign table directly */
 	bool		ri_usesFdwDirectModify;
 
+	/* bulk insert stuff */
+	int			ri_NumSlots;		/* number of slots in the array */
+	TupleTableSlot **ri_Slots;		/* input tuples for bulk insert */
+	TupleTableSlot **ri_PlanSlots;
+
 	/* list of WithCheckOption's to be checked */
 	List	   *ri_WithCheckOptions;
 
-- 
2.26.2

>From 845844a60936f750d77b02fce696fbb7179a1984 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Wed, 18 Nov 2020 01:15:49 +0100
Subject: [PATCH 2/3] make it compile

---
 src/backend/executor/nodeModifyTable.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 9c46036127..e20c613fed 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2289,7 +2289,9 @@ ExecModifyTable(PlanState *pstate)
 	 * Insert remaining tuples for bulk insert.
 	 */
 	if (proute)
-		resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions);
+	{
+		// resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions);
+	}
 	else
 	{
 		resultRelInfos = &resultRelInfo;
-- 
2.26.2

>From 306c05fdc7ab7181a63b583095b42f1ddc9e6b05 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Wed, 18 Nov 2020 01:08:44 +0100
Subject: [PATCH 3/3] reworks

---
 contrib/postgres_fdw/deparse.c         |  78 ++++-
 contrib/postgres_fdw/option.c          |  14 +
 contrib/postgres_fdw/postgres_fdw.c    | 389 ++++++++++++++++---------
 contrib/postgres_fdw/postgres_fdw.h    |   8 +-
 doc/src/sgml/config.sgml               |  21 --
 src/backend/executor/nodeModifyTable.c |   5 +-
 src/backend/nodes/list.c               |  32 ++
 src/include/nodes/pg_list.h            |  30 ++
 8 files changed, 415 insertions(+), 162 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 5aa81db08e..3d08aae987 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, bool doNothing,
 				 List *withCheckOptionList, List *returningList,
-				 List **retrieved_attrs, int *values_end_len)
+				 List **retrieved_attrs)
 {
 	AttrNumber	pindex;
 	bool		first;
@@ -1749,7 +1749,81 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 	}
 	else
 		appendStringInfoString(buf, " DEFAULT VALUES");
-	*values_end_len = buf->len;
+
+	if (doNothing)
+		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
+
+	deparseReturningList(buf, rte, rtindex, rel,
+						 rel->trigdesc && rel->trigdesc->trig_insert_after_row,
+						 withCheckOptionList, returningList, retrieved_attrs);
+}
+
+/*
+ * deparse remote bulk INSERT statement
+ *
+ * The statement text is appended to buf, and we also create an integer List
+ * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any),
+ * which is returned to *retrieved_attrs.
+ */
+void
+deparseBulkInsertSql(StringInfo buf, RangeTblEntry *rte,
+					 Index rtindex, Relation rel,
+					 List *targetAttrs, bool doNothing,
+					 List *withCheckOptionList, List *returningList,
+					 List **retrieved_attrs, int batchSize)
+{
+	AttrNumber	pindex;
+	bool		first;
+	ListCell   *lc;
+	int			i;
+
+	appendStringInfoString(buf, "INSERT INTO ");
+	deparseRelation(buf, rel);
+
+
+	if (targetAttrs)
+	{
+		appendStringInfoChar(buf, '(');
+
+		first = true;
+		foreach(lc, targetAttrs)
+		{
+			int			attnum = lfirst_int(lc);
+
+			if (!first)
+				appendStringInfoString(buf, ", ");
+			first = false;
+
+			deparseColumnRef(buf, rtindex, attnum, rte, false);
+		}
+
+		appendStringInfoString(buf, ") VALUES");
+
+		pindex = 1;
+
+		for (i = 0; i < batchSize; i++)
+		{
+			if (i > 0)
+				appendStringInfoString(buf, ", ");
+
+			appendStringInfoString(buf, "(");
+
+			first = true;
+			foreach(lc, targetAttrs)
+			{
+				if (!first)
+					appendStringInfoString(buf, ", ");
+				first = false;
+
+				appendStringInfo(buf, "$%d", pindex);
+				pindex++;
+			}
+
+			appendStringInfoChar(buf, ')');
+		}
+	}
+	else
+		appendStringInfoString(buf, " DEFAULT VALUES");
 
 	if (doNothing)
 		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 1a03e02263..32bd4194eb 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 						 errmsg("%s requires a non-negative integer value",
 								def->defname)));
 		}
+		else if (strcmp(def->defname, "batch_size") == 0)
+		{
+			int			batch_size;
+
+			batch_size = strtol(defGetString(def), NULL, 10);
+			if (batch_size <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("%s requires a non-negative integer value",
+								def->defname)));
+		}
 		else if (strcmp(def->defname, "password_required") == 0)
 		{
 			bool		pw_required = defGetBoolean(def);
@@ -203,6 +214,9 @@ InitPgFdwOptions(void)
 		/* fetch_size is available on both server and table */
 		{"fetch_size", ForeignServerRelationId, false},
 		{"fetch_size", ForeignTableRelationId, false},
+		/* batch_size is available on both server and table */
+		{"batch_size", ForeignServerRelationId, false},
+		{"batch_size", ForeignTableRelationId, false},
 		{"password_required", UserMappingRelationId, false},
 
 		/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index f7be4bec17..4a78b21634 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -84,10 +84,9 @@ enum FdwScanPrivateIndex
  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
  *
  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
- * 2) Integer list of target attribute numbers for INSERT/UPDATE
+ * 2) bulk INSERT statement text to be sent to the remote server (or NIL)
+ * 3) Integer list of target attribute numbers for INSERT/UPDATE
  *	  (NIL for a DELETE)
- * 3) Length till the end of VALUES clause for INSERT
- *	  (-1 for a DELETE/UPDATE)
  * 4) Boolean flag showing if the remote query has a RETURNING clause
  * 5) Integer list of attribute numbers retrieved by RETURNING, if any
  */
@@ -95,14 +94,16 @@ enum FdwModifyPrivateIndex
 {
 	/* SQL statement to execute remotely (as a String node) */
 	FdwModifyPrivateUpdateSql,
+	/* bulk SQL statement to execute remotely (as a String node) */
+	FdwModifyPrivateBulkUpdateSql,
 	/* Integer list of target attribute numbers for INSERT/UPDATE */
 	FdwModifyPrivateTargetAttnums,
-	/* Length till the end of VALUES clause (as an integer Value node) */
-	FdwModifyPrivateLen,
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
-	FdwModifyPrivateRetrievedAttrs
+	FdwModifyPrivateRetrievedAttrs,
+	/* INSERT batch size (number of tuples sent at once) */
+	FdwModifyPrivateBatchSize
 };
 
 /*
@@ -176,12 +177,12 @@ typedef struct PgFdwModifyState
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
+	char	   *p_name_bulk;	/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
-	char	   *orig_query;		/* original text of INSERT command */
+	char	   *query_bulk;		/* text of bulk INSERT command */
 	List	   *target_attrs;	/* list of target attribute numbers */
-	int			len;			/* length of some part of query */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 
@@ -191,7 +192,7 @@ typedef struct PgFdwModifyState
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
 	/* bulk operation stuff */
-	int			num_slots;		/* number of slots to insert */
+	int			batch_size;		/* maximum number of rows to insert */
 
 	/* working memory context */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
@@ -441,10 +442,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   CmdType operation,
 											   Plan *subplan,
 											   char *query,
+											   char *query_bulk,
 											   List *target_attrs,
-											   int len,
 											   bool has_returning,
-											   List *retrieved_attrs);
+											   List *retrieved_attrs,
+											   int batch_size);
 static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  ResultRelInfo *resultRelInfo,
 											  CmdType operation,
@@ -452,13 +454,15 @@ static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  TupleTableSlot **planSlots,
 											  int *numSlots);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
+static void prepare_foreign_modify_bulk(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
 											 TupleTableSlot **slots,
 											 int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
-static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn);
+static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void deallocate_query(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -619,6 +623,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
 	fpinfo->shippable_extensions = NIL;
 	fpinfo->fetch_size = 100;
+	fpinfo->batch_size = 100;
 
 	apply_server_options(fpinfo);
 	apply_table_options(fpinfo);
@@ -1677,15 +1682,15 @@ postgresPlanForeignModify(PlannerInfo *root,
 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
 	Relation	rel;
 	StringInfoData sql;
+	StringInfoData bulksql;
 	List	   *targetAttrs = NIL;
 	List	   *withCheckOptionList = NIL;
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
-	List	   *retvalList;
 	bool		doNothing = false;
-	int			values_end_len = -1;
 
 	initStringInfo(&sql);
+	initStringInfo(&bulksql);
 
 	/*
 	 * Core code already has some lock on each rel being planned, so we can
@@ -1771,7 +1776,11 @@ postgresPlanForeignModify(PlannerInfo *root,
 			deparseInsertSql(&sql, rte, resultRelation, rel,
 							 targetAttrs, doNothing,
 							 withCheckOptionList, returningList,
-							 &retrieved_attrs, &values_end_len);
+							 &retrieved_attrs);
+			deparseBulkInsertSql(&bulksql, rte, resultRelation, rel,
+							 	 targetAttrs, doNothing,
+							 	 withCheckOptionList, returningList,
+							 	 &retrieved_attrs, 100); /* FIXME pass the batch size */
 			break;
 		case CMD_UPDATE:
 			deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1795,12 +1804,12 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	retvalList = list_make4(makeString(sql.data),
+	return list_make6(makeString(sql.data),
+					  makeString(bulksql.data),
 					  targetAttrs,
-					  makeInteger(values_end_len),
-					  makeInteger((retrieved_attrs != NIL)));
-	retvalList = lappend(retvalList, retrieved_attrs);
-	return retvalList;
+					  makeInteger((retrieved_attrs != NIL)),
+					  retrieved_attrs,
+					  makeInteger(100));
 }
 
 /*
@@ -1816,10 +1825,11 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 {
 	PgFdwModifyState *fmstate;
 	char	   *query;
+	char	   *query_bulk;
 	List	   *target_attrs;
 	bool		has_returning;
-	int			values_end_len;
 	List	   *retrieved_attrs;
+	int			batch_size;
 	RangeTblEntry *rte;
 
 	/*
@@ -1832,14 +1842,16 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	/* Deconstruct fdw_private data. */
 	query = strVal(list_nth(fdw_private,
 							FdwModifyPrivateUpdateSql));
+	query_bulk = strVal(list_nth(fdw_private,
+								 FdwModifyPrivateBulkUpdateSql));
 	target_attrs = (List *) list_nth(fdw_private,
 									 FdwModifyPrivateTargetAttnums);
-	values_end_len = intVal(list_nth(fdw_private,
-									FdwModifyPrivateLen));
 	has_returning = intVal(list_nth(fdw_private,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
 										FdwModifyPrivateRetrievedAttrs);
+	batch_size = intVal(list_nth(fdw_private,
+								 FdwModifyPrivateBatchSize));
 
 	/* Find RTE. */
 	rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
@@ -1852,10 +1864,11 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									mtstate->operation,
 									mtstate->mt_plans[subplan_index]->plan,
 									query,
+									query_bulk,
 									target_attrs,
-									values_end_len,
 									has_returning,
-									retrieved_attrs);
+									retrieved_attrs,
+									batch_size);
 
 	resultRelInfo->ri_FdwState = fmstate;
 }
@@ -1971,7 +1984,7 @@ postgresEndForeignModify(EState *estate,
 		return;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate, true);
+	finish_foreign_modify(fmstate);
 }
 
 /*
@@ -1990,8 +2003,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	RangeTblEntry *rte;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	int			attnum;
-	int			values_end_len;
 	StringInfoData sql;
+	StringInfoData bulksql;
 	List	   *targetAttrs = NIL;
 	List	   *retrieved_attrs = NIL;
 	bool		doNothing = false;
@@ -2013,6 +2026,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 						RelationGetRelationName(rel))));
 
 	initStringInfo(&sql);
+	initStringInfo(&bulksql);
 
 	/* We transmit all columns that are defined in the foreign table. */
 	for (attnum = 1; attnum <= tupdesc->natts; attnum++)
@@ -2067,7 +2081,13 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
 					 resultRelInfo->ri_WithCheckOptions,
 					 resultRelInfo->ri_returningList,
-					 &retrieved_attrs, &values_end_len);
+					 &retrieved_attrs);
+
+	/* FIXME only do this when batch size != 1 */
+	deparseBulkInsertSql(&bulksql, rte, resultRelation, rel, targetAttrs, doNothing,
+						 resultRelInfo->ri_WithCheckOptions,
+						 resultRelInfo->ri_returningList,
+						 &retrieved_attrs, 100); /* FIXME set the batch size */
 
 	/* Construct an execution state. */
 	fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2076,10 +2096,11 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									CMD_INSERT,
 									NULL,
 									sql.data,
+									bulksql.data,
 									targetAttrs,
-									values_end_len,
 									retrieved_attrs != NIL,
-									retrieved_attrs);
+									retrieved_attrs,
+									100);	/* FIXME pass the correct batch_size */
 
 	/*
 	 * If the given resultRelInfo already has PgFdwModifyState set, it means
@@ -2116,7 +2137,7 @@ postgresEndForeignInsert(EState *estate,
 		fmstate = fmstate->aux_fmstate;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate, true);
+	finish_foreign_modify(fmstate);
 }
 
 /*
@@ -3605,10 +3626,11 @@ create_foreign_modify(EState *estate,
 					  CmdType operation,
 					  Plan *subplan,
 					  char *query,
+					  char *query_bulk,
 					  List *target_attrs,
-					  int len,
 					  bool has_returning,
-					  List *retrieved_attrs)
+					  List *retrieved_attrs,
+					  int batch_size)
 {
 	PgFdwModifyState *fmstate;
 	Relation	rel = resultRelInfo->ri_RelationDesc;
@@ -3641,10 +3663,8 @@ create_foreign_modify(EState *estate,
 
 	/* Set up remote query information. */
 	fmstate->query = query;
-	if (operation == CMD_INSERT)
-		fmstate->orig_query = pstrdup(fmstate->query);
+	fmstate->query_bulk = query_bulk;
 	fmstate->target_attrs = target_attrs;
-	fmstate->len = len;
 	fmstate->has_returning = has_returning;
 	fmstate->retrieved_attrs = retrieved_attrs;
 
@@ -3696,7 +3716,7 @@ create_foreign_modify(EState *estate,
 
 	Assert(fmstate->p_nums <= n_params);
 
-	fmstate->num_slots = 1;
+	fmstate->batch_size = batch_size;
 
 	/* Initialize auxiliary state */
 	fmstate->aux_fmstate = NULL;
@@ -3724,57 +3744,19 @@ execute_foreign_modify(EState *estate,
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
-	int			i, j;
-	int			pindex;
-	bool		first;
-	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
 	Assert(operation == CMD_INSERT ||
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
-	if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+	/* if we have a full insert batch, allocate the bulk prepared statement
+	 * if needed */
+	if (operation == CMD_INSERT)
 	{
-		/* Destroy the prepared statement created previously */
-		if (fmstate->p_name)
-			finish_foreign_modify(fmstate, false);
-
-		/*
-		 * Recreate INSERT command string with numSlots records in its
-		 * VALUES clause
-		 */
-
-		/* Copy up to the end of the first record from the original query */
-		initStringInfo(&sql);
-		appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len);
-
-		/* Add records to VALUES clause */
-		pindex = fmstate->p_nums + 1;
-		for (i = 0; i < *numSlots - 1; i++)
-		{
-			appendStringInfoString(&sql, ", (");
-
-			first = true;
-			for (j = 0; j < fmstate->p_nums; j++)
-			{
-				if (!first)
-					appendStringInfoString(&sql, ", ");
-				first = false;
-
-				appendStringInfo(&sql, "$%d", pindex);
-				pindex++;
-			}
-
-			appendStringInfoChar(&sql, ')');
-		}
-
-		/* Copy stuff after VALUES clause from the original query */
-		appendStringInfoString(&sql, fmstate->orig_query + fmstate->len);
-
-		pfree(fmstate->query);
-		fmstate->query = sql.data;
-		fmstate->num_slots = *numSlots;
+		if ((fmstate->batch_size == *numSlots) &&
+			(!fmstate->p_name_bulk))
+			prepare_foreign_modify_bulk(fmstate);
 	}
 
 	/* Set up the prepared statement on the remote server, if we didn't yet */
@@ -3798,45 +3780,92 @@ execute_foreign_modify(EState *estate,
 		ctid = (ItemPointer) DatumGetPointer(datum);
 	}
 
-	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
-
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
-							 fmstate->p_name,
-							 fmstate->p_nums * (*numSlots),
-							 p_values,
-							 NULL,
-							 NULL,
-							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+	if (fmstate->batch_size == *numSlots)
+	{
+		/* Convert parameters needed by prepared statement to text form */
+		p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
+
+		if (!PQsendQueryPrepared(fmstate->conn,
+								 fmstate->p_name_bulk,
+								 fmstate->p_nums * fmstate->batch_size,
+								 p_values,
+								 NULL,
+								 NULL,
+								 0))
+			pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
 
-	/*
-	 * Get the result, and check for success.
-	 *
-	 * We don't use a PG_TRY block here, so be careful not to throw error
-	 * without releasing the PGresult.
-	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
-	if (PQresultStatus(res) !=
-		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		/*
+		 * Get the result, and check for success.
+		 *
+		 * We don't use a PG_TRY block here, so be careful not to throw error
+		 * without releasing the PGresult.
+		 */
+		res = pgfdw_get_result(fmstate->conn, fmstate->query);
+		if (PQresultStatus(res) !=
+			(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+			pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
-	/* Check number of rows affected, and fetch RETURNING tuple if any */
-	if (fmstate->has_returning)
-	{
-		Assert(*numSlots == 1);
-		n_rows = PQntuples(res);
-		if (n_rows > 0)
-			store_returning_result(fmstate, slots[0], res);
+		/* Check number of rows affected, and fetch RETURNING tuple if any */
+		if (fmstate->has_returning)
+		{
+			Assert(*numSlots == 1);
+			n_rows = PQntuples(res);
+			if (n_rows > 0)
+				store_returning_result(fmstate, slots[0], res);
+		}
+		else
+			n_rows = atoi(PQcmdTuples(res));
+
+		/* And clean up */
+		PQclear(res);
 	}
 	else
-		n_rows = atoi(PQcmdTuples(res));
+	{
+		int i;
 
-	/* And clean up */
-	PQclear(res);
+		for (i = 0; i < *numSlots; i++)
+		{
+			/* Convert parameters needed by prepared statement to text form */
+			p_values = convert_prep_stmt_params(fmstate, ctid, &slots[i], 1);
+
+			if (!PQsendQueryPrepared(fmstate->conn,
+									 fmstate->p_name,
+									 fmstate->p_nums,
+									 p_values,
+									 NULL,
+									 NULL,
+									 0))
+				pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+			/*
+			 * Get the result, and check for success.
+			 *
+			 * We don't use a PG_TRY block here, so be careful not to throw error
+			 * without releasing the PGresult.
+			 */
+			res = pgfdw_get_result(fmstate->conn, fmstate->query);
+			if (PQresultStatus(res) !=
+				(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+				pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+			/* Check number of rows affected, and fetch RETURNING tuple if any */
+			if (fmstate->has_returning)
+			{
+				Assert(*numSlots == 1);
+				n_rows += PQntuples(res);
+				if (PQntuples(res) > 0)
+					store_returning_result(fmstate, slots[i], res);
+			}
+			else
+				n_rows += atoi(PQcmdTuples(res));
+
+			/* And clean up */
+			PQclear(res);
+		}
+	}
 
 	MemoryContextReset(fmstate->temp_cxt);
 
@@ -3848,6 +3877,51 @@ execute_foreign_modify(EState *estate,
 	return (n_rows > 0) ? slots : NULL;
 }
 
+/*
+ * prepare_foreign_modify
+ *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+ */
+static void
+prepare_foreign_modify_bulk(PgFdwModifyState *fmstate)
+{
+	char		prep_name[NAMEDATALEN];
+	char	   *p_name;
+	PGresult   *res;
+
+	/* Construct name we'll use for the prepared statement. */
+	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
+			 GetPrepStmtNumber(fmstate->conn));
+	p_name = pstrdup(prep_name);
+
+	/*
+	 * We intentionally do not specify parameter types here, but leave the
+	 * remote server to derive them by default.  This avoids possible problems
+	 * with the remote server using different type OIDs than we do.  All of
+	 * the prepared statements we use in this module are simple enough that
+	 * the remote server will make the right choices.
+	 */
+	if (!PQsendPrepare(fmstate->conn,
+					   p_name,
+					   fmstate->query_bulk,
+					   0,
+					   NULL))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query_bulk);
+
+	/*
+	 * Get the result, and check for success.
+	 *
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query_bulk);
+	PQclear(res);
+
+	/* This action shows that the prepare has been done. */
+	fmstate->p_name_bulk = p_name;
+}
+
 /*
  * prepare_foreign_modify
  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
@@ -4003,41 +4077,71 @@ store_returning_result(PgFdwModifyState *fmstate,
 	PG_END_TRY();
 }
 
+
+static void
+deallocate_query(PgFdwModifyState *fmstate)
+{
+	char		sql[64];
+	PGresult   *res;
+
+	/* do nothing if the query is not allocated */
+	if (!fmstate->p_name)
+		return;
+
+	snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+	/*
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_exec_query(fmstate->conn, sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+	PQclear(res);
+	fmstate->p_name = NULL;
+}
+
+
+static void
+deallocate_query_bulk(PgFdwModifyState *fmstate)
+{
+	char		sql[64];
+	PGresult   *res;
+
+	/* do nothing if the query is not allocated */
+	if (!fmstate->p_name_bulk)
+		return;
+
+	snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name_bulk);
+
+	/*
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_exec_query(fmstate->conn, sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+	PQclear(res);
+	fmstate->p_name_bulk = NULL;
+}
+
+
 /*
  * finish_foreign_modify
  *		Release resources for a foreign insert/update/delete operation
  */
 static void
-finish_foreign_modify(PgFdwModifyState *fmstate,
-	bool release_conn)
+finish_foreign_modify(PgFdwModifyState *fmstate)
 {
 	Assert(fmstate != NULL);
 
-	/* If we created a prepared statement, destroy it */
-	if (fmstate->p_name)
-	{
-		char		sql[64];
-		PGresult   *res;
-
-		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
-		/*
-		 * We don't use a PG_TRY block here, so be careful not to throw error
-		 * without releasing the PGresult.
-		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-		PQclear(res);
-		fmstate->p_name = NULL;
-	}
+	/* If we created prepared statements, destroy them */
+	deallocate_query(fmstate);
+	deallocate_query_bulk(fmstate);
 
 	/* Release remote connection */
-	if (release_conn)
-	{
-		ReleaseConnection(fmstate->conn);
-		fmstate->conn = NULL;
-	}
+	ReleaseConnection(fmstate->conn);
+	fmstate->conn = NULL;
 }
 
 /*
@@ -5483,6 +5587,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
 				ExtractExtensionList(defGetString(def), false);
 		else if (strcmp(def->defname, "fetch_size") == 0)
 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+		else if (strcmp(def->defname, "batch_size") == 0)
+			fpinfo->batch_size = strtol(defGetString(def), NULL, 10);
 	}
 }
 
@@ -5504,6 +5610,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
 			fpinfo->use_remote_estimate = defGetBoolean(def);
 		else if (strcmp(def->defname, "fetch_size") == 0)
 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+		else if (strcmp(def->defname, "batch_size") == 0)
+			fpinfo->batch_size = strtol(defGetString(def), NULL, 10);
 	}
 }
 
@@ -5538,6 +5646,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
 	fpinfo->fetch_size = fpinfo_o->fetch_size;
+	fpinfo->batch_size = fpinfo_o->batch_size;
 
 	/* Merge the table level options from either side of the join. */
 	if (fpinfo_i)
@@ -5559,6 +5668,12 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 		 * relation sizes.
 		 */
 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+		/*
+		 * XXX Not sure it's particularly useful to merge batch_size for
+		 * join relations, but it it can't hurt.
+		 */
+		fpinfo->batch_size = Max(fpinfo_o->batch_size, fpinfo_i->batch_size);
 	}
 }
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 459a9ca6ab..ea2fdfb9c1 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 
 	int			fetch_size;		/* fetch size for this remote table */
+	int			batch_size;		/* insert batch size for this remote table */
 
 	/*
 	 * Name of the relation, for use while EXPLAINing ForeignScan.  It is used
@@ -161,7 +162,12 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
-							 List **retrieved_attrs, int *values_end_len);
+							 List **retrieved_attrs);
+extern void deparseBulkInsertSql(StringInfo buf, RangeTblEntry *rte,
+								 Index rtindex, Relation rel,
+								 List *targetAttrs, bool doNothing,
+								 List *withCheckOptionList, List *returningList,
+								 List **retrieved_attrs, int batchSize);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 51bfe445b0..a632cf98ba 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -8533,27 +8533,6 @@ SET XML OPTION { DOCUMENT | CONTENT };
       </listitem>
      </varlistentry>
 
-     <varlistentry id="guc-max-bulk-insert-tuples" xreflabel="max_bulk_insert_tuples">
-      <term><varname>max_bulk_insert_tuples</varname> (<type>integer</type>)
-      <indexterm>
-       <primary><varname>max_bulk_insert_tuples</varname></primary>
-       <secondary>configuration parameter</secondary>
-      </indexterm>
-      </term>
-      <listitem>
-       <para>
-        Sets the maximum number of tuples to accumulate and insert in bulk
-        into a foreign table. This applies to each partition when the insert
-        target is a partitioned table.
-        The valid range is <literal>1</literal>, which disables bulk insert,
-        to <literal>1000</literal>.
-        This takes effect only if the foreign data wrapper supports
-        bulk insert.
-        The default is <literal>100</literal>.
-       </para>
-      </listitem>
-     </varlistentry>
-
      </variablelist>
     </sect2>
      <sect2 id="runtime-config-client-format">
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index e20c613fed..4932014515 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -57,7 +57,10 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
-
+/*
+ * FIXME this should be removed / replaced with the foreign server/table
+ * batch_size option.
+ */
 int max_bulk_insert_tuples;
 
 static void ExecBulkInsert(ModifyTableState *mtstate,
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index efa44342c4..56c63ba2a3 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -277,6 +277,38 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 	return list;
 }
 
+List *
+list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+				ListCell datum3, ListCell datum4, ListCell datum5)
+{
+	List	   *list = new_list(t, 5);
+
+	list->elements[0] = datum1;
+	list->elements[1] = datum2;
+	list->elements[2] = datum3;
+	list->elements[3] = datum4;
+	list->elements[4] = datum5;
+	check_list_invariants(list);
+	return list;
+}
+
+List *
+list_make6_impl(NodeTag t, ListCell datum1, ListCell datum2,
+				ListCell datum3, ListCell datum4, ListCell datum5,
+				ListCell datum6)
+{
+	List	   *list = new_list(t, 6);
+
+	list->elements[0] = datum1;
+	list->elements[1] = datum2;
+	list->elements[2] = datum3;
+	list->elements[3] = datum4;
+	list->elements[4] = datum5;
+	list->elements[5] = datum6;
+	check_list_invariants(list);
+	return list;
+}
+
 /*
  * Make room for a new head cell in the given (non-NIL) list.
  *
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index cda77a841e..195a8c1818 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -213,6 +213,14 @@ list_length(const List *l)
 #define list_make4(x1,x2,x3,x4) \
 	list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
 					list_make_ptr_cell(x3), list_make_ptr_cell(x4))
+#define list_make5(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+					list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+					list_make_ptr_cell(x5))
+#define list_make6(x1,x2,x3,x4,x5,x6) \
+	list_make6_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+					list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+					list_make_ptr_cell(x5), list_make_ptr_cell(x6))
 
 #define list_make1_int(x1) \
 	list_make1_impl(T_IntList, list_make_int_cell(x1))
@@ -224,6 +232,14 @@ list_length(const List *l)
 #define list_make4_int(x1,x2,x3,x4) \
 	list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
 					list_make_int_cell(x3), list_make_int_cell(x4))
+#define list_make5_int(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+					list_make_int_cell(x3), list_make_int_cell(x4), \
+					list_make_int_cell(x5))
+#define list_make6_int(x1,x2,x3,x4,x5,x6) \
+	list_make6_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+					list_make_int_cell(x3), list_make_int_cell(x4), \
+					list_make_int_cell(x5), list_make_int_cell(x6))
 
 #define list_make1_oid(x1) \
 	list_make1_impl(T_OidList, list_make_oid_cell(x1))
@@ -235,6 +251,14 @@ list_length(const List *l)
 #define list_make4_oid(x1,x2,x3,x4) \
 	list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
 					list_make_oid_cell(x3), list_make_oid_cell(x4))
+#define list_make5_oid(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+					list_make_oid_cell(x3), list_make_oid_cell(x4), \
+					list_make_oid_cell(x5))
+#define list_make6_oid(x1,x2,x3,x4,x5,x6) \
+	list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+					list_make_oid_cell(x3), list_make_oid_cell(x4), \
+					list_make_oid_cell(x5), list_make_oid_cell(x6))
 
 /*
  * Locate the n'th cell (counting from 0) of the list.
@@ -520,6 +544,12 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3);
 extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3, ListCell datum4);
+extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+							 ListCell datum3, ListCell datum4,
+							 ListCell datum5);
+extern List *list_make6_impl(NodeTag t, ListCell datum1, ListCell datum2,
+							 ListCell datum3, ListCell datum4,
+							 ListCell datum5, ListCell datum6);
 
 extern pg_nodiscard List *lappend(List *list, void *datum);
 extern pg_nodiscard List *lappend_int(List *list, int datum);
-- 
2.26.2

Reply via email to