On Wed, Apr 3, 2024 at 1:10 AM Jeff Davis <pg...@j-davis.com> wrote:
>
> Here's where I think this API should go:
>
> 1. Have table_modify_begin/end and table_modify_buffer_insert, like
> those that are implemented in your patch.

I added table_modify_begin, table_modify_buffer_insert,
table_modify_buffer_flush and table_modify_end. Table Access Method (AM)
authors now can define their own buffering strategy and flushing decisions
based on their tuple storage kinds and various other AM specific factors. I
also added a default implementation that falls back to single inserts when
no implementation is provided for these AM by AM authors. See the attached
v19-0001 patch.

> 2. Add some kind of flush callback that will be called either while the
> tuples are being flushed or after the tuples are flushed (but before
> they are freed by the AM). (Aside: do we need to call it while the
> tuples are being flushed to get the right visibility semantics for
> after-row triggers?)

I added a flush callback named TableModifyBufferFlushCallback; when
provided by callers invoked after tuples are flushed to disk from the
buffers but before the AM frees them up. Index insertions and AFTER ROW
INSERT triggers can be executed in this callback. See the v19-0001 patch
for how AM invokes the flush callback, and see either v19-0003 or v19-0004
or v19-0005 for how a caller can supply the callback and required context
to execute index insertions and AR triggers.

> 3. Add table_modify_buffer_{update|delete} APIs.
>
> 9. Use these new methods for DELETE, UPDATE, and MERGE. MERGE can use
> the buffer_insert/update/delete APIs; we don't need a separate merge
> method. This probably requires that the AM maintain 3 separate buffers
> to distinguish different kinds of changes at flush time (obviously
> these can be initialized lazily to avoid overhead when not being used).

I haven't thought about these things yet. I can only focus on them after
seeing how the attached patches go from here.

> 4. Some kind of API tweaks to help manage memory when modifying
> pertitioned tables, so that the buffering doesn't get out of control.
> Perhaps just reporting memory usage and allowing the caller to force
> flushes would be enough.

Heap implementation for thes new Table AMs uses a separate memory context
for all of the operations. Please have a look and let me know if we need
anything more.

> 5. Use these new methods for CREATE/REFRESH MATERIALIZED VIEW. This is
> fairly straightforward, I believe, and handled by your patch. Indexes
> are (re)built afterward, and no triggers are possible.
>
> 6. Use these new methods for CREATE TABLE ... AS. This is fairly
> straightforward, I believe, and handled by your patch. No indexes or
> triggers are possible.

I used multi inserts for all of these including TABLE REWRITE commands such
as ALTER TABLE. See the attached v19-0002 patch. Check the testing section
below for benefits.

FWIW, following are some of the TABLE REWRITE commands that can get
benefitted:

ALTER TABLE tbl ALTER c1 TYPE bigint;
ALTER TABLE itest13 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY;
ALTER MATERIALIZED VIEW heapmv SET ACCESS METHOD heap2;
ALTER TABLE itest3 ALTER COLUMN a TYPE int;
ALTER TABLE gtest20 ALTER COLUMN b SET EXPRESSION AS (a * 3);
ALTER TABLE has_volatile ADD col4 int DEFAULT (random() * 10000)::int;
and so on.

> 7. Use these new methods for COPY. We have to be careful to avoid
> regressions for the heap method, because it's already managing its own
> buffers. If the AM manages the buffering, then it may require
> additional copying of slots, which could be a disadvantage. To solve
> this, we may need some minor API tweaks to avoid copying when the
> caller guarantees that the memory will not be freed to early, or
> perhaps expose the AM's memory context to copyfrom.c. Another thing to
> consider is that the buffering in copyfrom.c is also used for FDWs, so
> that buffering code path needs to be preserved in copyfrom.c even if
> not used for AMs.

I modified the COPY FROM code to use the new Table AMs, and performed some
tests which show no signs of regression. Check the testing section below
for more details. See the attached v19-0005 patch. With this,
table_multi_insert can be deprecated.

> 8. Use these new methods for INSERT INTO ... SELECT. One potential
> challenge here is that execution nodes are not always run to
> completion, so we need to be sure that the flush isn't forgotten in
> that case.

I did that in v19-0003. I did place the table_modify_end call in multiple
places including ExecEndModifyTable. I didn't find any issues with it.
Please have a look and let me know if we need the end call in more places.
Check the testing section below for benefits.

> 10. Use these new methods for logical apply.

I used multi inserts for Logical Replication apply. in v19-0004. Check the
testing section below for benefits.

FWIW, open-source pglogical does have multi insert support, check code
around
https://github.com/2ndQuadrant/pglogical/blob/REL2_x_STABLE/pglogical_apply_heap.c#L960
.

> 11. Deprecate the multi_insert API.

I did remove both table_multi_insert and table_finish_bulk_insert in
v19-0006. Perhaps, removing them isn't a great idea, but adding a
deprecation WARNING/ERROR until some more PG releases might be worth
looking at.

> Thoughts on this plan? Does your patch make sense in v17 as a stepping
> stone, or should we try to make all of these API changes together in
> v18?

If the design, code and benefits that these new Table AMs bring to the
table look good, I hope to see it for PG 18.

> Also, a sample AM code would be a huge benefit here. Writing a real AM
> is hard, but perhaps we can at least have an example one to demonstrate
> how to use these APIs?

The attached patches already have implemented these new Table AMs for Heap.
I don't think we need a separate implementation to demonstrate. If others
feel so, I'm open to thoughts here.

Having said above, I'd like to reiterate the motivation behind the new
Table AMs for multi and single inserts.

1. A scan-like API with state being carried across is thought to be better
as suggested by Andres Freund -
https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3f...@alap3.anarazel.de
.
2. Allowing a Table AM to optimize operations across multiple inserts,
define its own buffering strategy and take its own flushing decisions based
on their tuple storage kinds and various other AM specific factors.
3. Improve performance of various SQL commands with multi inserts for Heap
AM.

The attached v19 patches might need some more detailed comments, some
documentation and some specific tests ensuring the multi inserts for Heap
are kicked-in for various commands. I'm open to thoughts here.

I did some testing to see how various commands benefit with multi inserts
using these new Table AM for heap. It's not only the improvement in
performance these commands see, but also the amount of WAL that gets
generated reduces greatly. After all, multi inserts optimize the insertions
by writing less WAL. IOW, writing WAL record per page if multiple rows fit
into a single data page as opposed to WAL record per row.

Test case 1: 100 million rows, 2 columns (int and float)

Command                        | HEAD (sec) | PATCHED (sec) | Faster by % |
Faster by X
------------------------------ | ---------- | ------------- | ----------- |
-----------
CREATE TABLE AS                | 121        | 77            | 36.3        |
1.57
CREATE MATERIALIZED VIEW       | 101        | 49            | 51.4        |
2.06
REFRESH MATERIALIZED VIEW      | 113        | 54            | 52.2        |
2.09
ALTER TABLE (TABLE REWRITE)    | 124        | 81            | 34.6        |
1.53
COPY FROM                      | 71         | 72            | 0           |
1
INSERT INTO ... SELECT         | 117        | 62            | 47          |
1.88
LOGICAL REPLICATION APPLY      | 393        | 306           | 22.1        |
1.28

Command                        | HEAD (WAL in GB) | PATCHED (WAL in GB) |
Reduced by % | Reduced by X
------------------------------ | ---------------- | ------------------- |
------------ | -----------
CREATE TABLE AS                | 6.8              | 2.4                 |
64.7         | 2.83
CREATE MATERIALIZED VIEW       | 7.2              | 2.3                 |
68           | 3.13
REFRESH MATERIALIZED VIEW      | 10               | 5.1                 |
49           | 1.96
ALTER TABLE (TABLE REWRITE)    | 8                | 3.2                 |
60           | 2.5
COPY FROM                      | 2.9              | 3                   | 0
           | 1
INSERT INTO ... SELECT         | 8                | 3                   |
62.5         | 2.66
LOGICAL REPLICATION APPLY      | 7.5              | 2.3                 |
69.3         | 3.26

Test case 2: 1 billion rows, 1 column (int)

Command                        | HEAD (sec) | PATCHED (sec) | Faster by % |
Faster by X
------------------------------ | ---------- | ------------- | ----------- |
-----------
CREATE TABLE AS                | 794        | 386           | 51.38       |
2.05
CREATE MATERIALIZED VIEW       | 1006       | 563           | 44.03       |
1.78
REFRESH MATERIALIZED VIEW      | 977        | 603           | 38.28       |
1.62
ALTER TABLE (TABLE REWRITE)    | 1189       | 714           | 39.94       |
1.66
COPY FROM                      | 321        | 330           | -0.02       |
0.97
INSERT INTO ... SELECT         | 1084       | 586           | 45.94       |
1.84
LOGICAL REPLICATION APPLY      | 3530       | 2982          | 15.52       |
1.18

Command                        | HEAD (WAL in GB) | PATCHED (WAL in GB) |
Reduced by % | Reduced by X
------------------------------ | ---------------- | ------------------- |
------------ | -----------
CREATE TABLE AS                | 60               | 12                  |
80           | 5
CREATE MATERIALIZED VIEW       | 60               | 12                  |
80           | 5
REFRESH MATERIALIZED VIEW      | 60               | 12                  |
80           | 5
ALTER TABLE (TABLE REWRITE)    | 123              | 31                  |
60           | 2.5
COPY FROM                      | 12               | 12                  | 0
           | 1
INSERT INTO ... SELECT         | 120              | 24                  |
80           | 5
LOGICAL REPLICATION APPLY      | 61               | 12                  |
80.32        | 5

Test setup:
./configure --prefix=$PWD/pg17/ --enable-tap-tests CFLAGS="-ggdb3 -O2" >
install.log && make -j 8 install > install.log 2>&1 &

wal_level=logical
max_wal_size = 256GB
checkpoint_timeout = 1h

Test system is EC2 instance of type c5.4xlarge:
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         46 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  16
  On-line CPU(s) list:   0-15
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz
    CPU family:          6
    Model:               85
    Thread(s) per core:  2
    Core(s) per socket:  8
    Socket(s):           1
    Stepping:            7
    BogoMIPS:            5999.99
Caches (sum of all):
  L1d:                   256 KiB (8 instances)
  L1i:                   256 KiB (8 instances)
  L2:                    8 MiB (8 instances)
  L3:                    35.8 MiB (1 instance)
NUMA:
  NUMA node(s):          1
  NUMA node0 CPU(s):     0-15
RAM:
  MemTotal:       32036536 kB

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 75666da998aaa8fbc60d62ad8c160a5c227065e6 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 04:12:29 +0000
Subject: [PATCH v19 1/6] Introduce new Table Access Methods for single and
 multi inserts

---
 src/backend/access/heap/heapam.c         | 202 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableam.c       |  95 +++++++++++
 src/backend/access/table/tableamapi.c    |  10 ++
 src/include/access/heapam.h              |  44 +++++
 src/include/access/tableam.h             | 146 ++++++++++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 7 files changed, 505 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4a4cf76269..37c6ed232c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -112,7 +113,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end_callback(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2608,6 +2609,205 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags,
+				  CommandId cid, int options,
+				  TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				  void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+
+		if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0)
+		{
+			mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+			mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+			istate->mistate = mistate;
+			mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext,
+													 "heap_multi_insert memory context",
+													 ALLOCSET_DEFAULT_SIZES);
+		}
+
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+
+		state->modify_end_callback = heap_modify_insert_end_callback;
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	dstslot = mistate->slots[mistate->cur_slots];
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	ExecClearTuple(dstslot);
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	/*
+	 * Memory allocated for the whole tuple is in slot's memory context, so
+	 * use it keep track of the total space occupied by all buffered tuples.
+	 */
+	if (TTS_SHOULDFREE(dstslot))
+		mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false);
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_cxt);
+
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->options, istate->bistate);
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_cxt);
+
+	if (state->modify_buffer_flush_callback != NULL)
+		state->modify_buffer_flush_callback(state->modify_buffer_flush_context,
+											mistate->slots, mistate->cur_slots);
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end_callback(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0 &&
+			   mistate->cur_size == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_cxt);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..eda0c73a16 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2615,6 +2615,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index e57a0b7ea3..0e4ce1aca6 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -21,6 +21,7 @@
 
 #include <math.h>
 
+#include "access/heapam.h"		/* just for BulkInsertState */
 #include "access/syncscan.h"
 #include "access/tableam.h"
 #include "access/xact.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "utils/memutils.h"
 
 /*
  * Constants to control the behavior of block allocation to parallel workers
@@ -48,6 +50,7 @@
 char	   *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
 bool		synchronize_seqscans = true;
 
+static void default_table_modify_insert_end_callback(TableModifyState *state);
 
 /* ----------------------------------------------------------------------------
  * Slot functions.
@@ -756,3 +759,95 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
 	else
 		*allvisfrac = (double) relallvisible / curpages;
 }
+
+/*
+ * Initialize default table modify state.
+ */
+TableModifyState *
+default_table_modify_begin(Relation rel, int modify_flags,
+						   CommandId cid, int options,
+						   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+						   void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+									"default_table_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Default table modify implementation for inserts.
+ */
+void
+default_table_modify_buffer_insert(TableModifyState *state,
+								   TupleTableSlot *slot)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize default table modify state */
+	if (state->data == NULL)
+	{
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			state->data = (BulkInsertState) GetBulkInsertState();
+
+		state->modify_end_callback = default_table_modify_insert_end_callback;
+	}
+
+	/* Fallback to table AM single insert routine */
+	table_tuple_insert(state->rel,
+					   slot,
+					   state->cid,
+					   state->options,
+					   (BulkInsertState) state->data);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Default table modify implementation for flush.
+ */
+void
+default_table_modify_buffer_flush(TableModifyState *state)
+{
+	/* no-op */
+}
+
+/*
+ * Default table modify insert specific callback used for performing work at
+ * the end like cleaning up the bulk insert state.
+ */
+static void
+default_table_modify_insert_end_callback(TableModifyState *state)
+{
+	if (state->data != NULL)
+		FreeBulkInsertState((BulkInsertState) state->data);
+}
+
+/*
+ * Clean default table modify state.
+ */
+void
+default_table_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index ce637a5a5d..96ac951af6 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,16 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	/* optional, but either all of them are defined or none. */
+	Assert((routine->tuple_modify_begin == NULL &&
+			routine->tuple_modify_buffer_insert == NULL &&
+			routine->tuple_modify_buffer_flush == NULL &&
+			routine->tuple_modify_end == NULL) ||
+		   (routine->tuple_modify_begin != NULL &&
+			routine->tuple_modify_buffer_insert != NULL &&
+			routine->tuple_modify_buffer_flush != NULL &&
+			routine->tuple_modify_end != NULL));
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index c47a5045ce..c10ebbb5ea 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -271,6 +271,38 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Approximate size of all tuples currently held in buffered slots */
+	Size		cur_size;
+
+	MemoryContext mem_cxt;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -321,6 +353,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   int modify_flags,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+										   void *modify_buffer_flush_context);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..ddb6e6f3a5 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -255,6 +255,43 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Table modify flags */
+
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TM_FLAG_MULTI_INSERTS	0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE	0x000002
+
+struct TableModifyState;
+
+/* Callback invoked for each tuple that gets flushed to disk from buffer */
+typedef void (*TableModifyBufferFlushCallback) (void *context,
+												TupleTableSlot **slots,
+												int nslots);
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCallback) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	Relation	rel;
+	int			modify_flags;
+	MemoryContext mem_cxt;
+	CommandId	cid;
+	int			options;
+
+	/* Flush callback and its context */
+	TableModifyBufferFlushCallback modify_buffer_flush_callback;
+	void	   *modify_buffer_flush_context;
+
+	/* Table AM specific data */
+	void	   *data;
+
+	TableModifyEndCallback modify_end_callback;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -578,6 +615,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 int modify_flags,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCallback modify_buffer_flush_callback,
+											 void *modify_buffer_flush_context);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1609,6 +1661,100 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags,
+													CommandId cid, int options,
+													TableModifyBufferFlushCallback modify_buffer_flush_callback,
+													void *modify_buffer_flush_context);
+extern void default_table_modify_buffer_insert(TableModifyState *state,
+											   TupleTableSlot *slot);
+extern void default_table_modify_buffer_flush(TableModifyState *state);
+extern void default_table_modify_end(TableModifyState *state);
+
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options,
+				   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				   void *modify_buffer_flush_context)
+{
+	if (rel->rd_tableam &&
+		rel->rd_tableam->tuple_modify_begin != NULL)
+	{
+		return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+												   cid, options,
+												   modify_buffer_flush_callback,
+												   modify_buffer_flush_context);
+	}
+	else if (rel->rd_tableam &&
+			 rel->rd_tableam->tuple_modify_begin == NULL)
+	{
+		/* Fallback to a default implementation */
+		return default_table_modify_begin(rel, modify_flags,
+										  cid, options,
+										  modify_buffer_flush_callback,
+										  modify_buffer_flush_context);
+	}
+	else
+		Assert(false);
+
+	return NULL;
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_insert != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_insert == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_insert(state, slot);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_flush != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_flush == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_flush(state);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_end != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_end(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_end == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_end(state);
+	}
+	else
+		Assert(false);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d551ada325..ebde07bcde 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1130,6 +1130,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2844,6 +2846,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.34.1

From 5a6dd7ac0cae831fbd8294710997dd484c089fcb Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 04:12:58 +0000
Subject: [PATCH v19 2/6] Optimize CTAS, CMV, RMV and TABLE REWRITES with multi
 inserts

---
 src/backend/commands/createas.c  | 27 +++++++++++----------------
 src/backend/commands/matview.c   | 26 +++++++++++---------------
 src/backend/commands/tablecmds.c | 31 +++++++++++--------------------
 3 files changed, 33 insertions(+), 51 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 62050f4dc5..2d6fffbf07 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -552,17 +550,21 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->mstate = table_modify_begin(intoRelationDesc,
+											 TM_FLAG_MULTI_INSERTS |
+											 TM_FLAG_BAS_BULKWRITE,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 NULL,
+											 NULL);
 	else
-		myState->bistate = NULL;
+		myState->mstate = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -590,11 +592,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_modify_buffer_insert(myState->mstate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -612,10 +610,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_modify_end(myState->mstate);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 6d09b75556..bb97e2fa5f 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
 	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -458,9 +456,14 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->mstate = table_modify_begin(transientrel,
+										 TM_FLAG_MULTI_INSERTS |
+										 TM_FLAG_BAS_BULKWRITE,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM |
+										 TABLE_INSERT_FROZEN,
+										 NULL,
+										 NULL);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -485,12 +488,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_modify_buffer_insert(myState->mstate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -505,9 +503,7 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_modify_end(myState->mstate);
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3556240c8e..0c984aa656 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -6060,10 +6060,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	int			i;
 	ListCell   *l;
 	EState	   *estate;
-	CommandId	mycid;
-	BulkInsertState bistate;
-	int			ti_options;
 	ExprState  *partqualstate = NULL;
+	TableModifyState *mstate = NULL;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -6082,18 +6080,15 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	 * Prepare a BulkInsertState and options for table_tuple_insert.  The FSM
 	 * is empty, so don't bother using it.
 	 */
-	if (newrel)
+	if (newrel && mstate == NULL)
 	{
-		mycid = GetCurrentCommandId(true);
-		bistate = GetBulkInsertState();
-		ti_options = TABLE_INSERT_SKIP_FSM;
-	}
-	else
-	{
-		/* keep compiler quiet about using these uninitialized */
-		mycid = 0;
-		bistate = NULL;
-		ti_options = 0;
+		mstate = table_modify_begin(newrel,
+									TM_FLAG_MULTI_INSERTS |
+									TM_FLAG_BAS_BULKWRITE,
+									GetCurrentCommandId(true),
+									TABLE_INSERT_SKIP_FSM,
+									NULL,
+									NULL);
 	}
 
 	/*
@@ -6392,8 +6387,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 			/* Write the tuple out to the new relation */
 			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
-								   ti_options, bistate);
+				table_modify_buffer_insert(mstate, insertslot);
 
 			ResetExprContext(econtext);
 
@@ -6414,10 +6408,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	table_close(oldrel, NoLock);
 	if (newrel)
 	{
-		FreeBulkInsertState(bistate);
-
-		table_finish_bulk_insert(newrel, ti_options);
-
+		table_modify_end(mstate);
 		table_close(newrel, NoLock);
 	}
 }
-- 
2.34.1

From d3f0c64e85417e6fcf164656481ea80732b9bd87 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 04:15:49 +0000
Subject: [PATCH v19 3/6] Optimize INSERT INTO ... SELECT with multi inserts

---
 contrib/test_decoding/expected/stream.out |   2 +-
 src/backend/executor/nodeModifyTable.c    | 177 +++++++++++++++++++---
 src/tools/pgindent/typedefs.list          |   1 +
 3 files changed, 161 insertions(+), 19 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 4ab2d47bf8..c19facb3c9 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -101,10 +101,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  streaming change for transaction
  streaming change for transaction
- streaming change for transaction
  closing a streamed block for transaction
  opening a streamed block for transaction
  streaming change for transaction
+ streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
 (17 rows)
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index cee60d3659..434e3f8411 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -114,6 +114,19 @@ typedef struct UpdateContext
 	LockTupleMode lockmode;
 } UpdateContext;
 
+typedef struct InsertModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+	ModifyTableState *mtstate;
+} InsertModifyBufferFlushContext;
+
+static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL;
+static TableModifyState *table_modify_state = NULL;
+
+static void InsertModifyBufferFlushCallback(void *context,
+											TupleTableSlot **slots,
+											int nslots);
 
 static void ExecBatchInsert(ModifyTableState *mtstate,
 							ResultRelInfo *resultRelInfo,
@@ -726,6 +739,61 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
 	return ExecProject(newProj);
 }
 
+static void
+InsertModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots)
+{
+	InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	ModifyTableState *mtstate = ctx->mtstate;
+	int			i;
+
+	if (nslots <= 0)
+		return;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+	for (i = 0; i < nslots; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  slots[i], estate, false,
+									  false, NULL, NIL, false);
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 mtstate->mt_transition_capture);
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL,
+								 mtstate->mt_transition_capture);
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -751,7 +819,8 @@ ExecInsert(ModifyTableContext *context,
 		   TupleTableSlot *slot,
 		   bool canSetTag,
 		   TupleTableSlot **inserted_tuple,
-		   ResultRelInfo **insert_destrel)
+		   ResultRelInfo **insert_destrel,
+		   bool canMultiInsert)
 {
 	ModifyTableState *mtstate = context->mtstate;
 	EState	   *estate = context->estate;
@@ -764,6 +833,7 @@ ExecInsert(ModifyTableContext *context,
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
 	MemoryContext oldContext;
+	bool		ar_insert_triggers_executed = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -1126,17 +1196,53 @@ ExecInsert(ModifyTableContext *context,
 		}
 		else
 		{
-			/* insert the tuple normally */
-			table_tuple_insert(resultRelationDesc, slot,
-							   estate->es_output_cid,
-							   0, NULL);
+			if (canMultiInsert &&
+				proute == NULL &&
+				resultRelInfo->ri_WithCheckOptions == NIL &&
+				resultRelInfo->ri_projectReturning == NULL)
+			{
+				if (insert_modify_buffer_flush_context == NULL)
+				{
+					insert_modify_buffer_flush_context =
+						(InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext));
+					insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo;
+					insert_modify_buffer_flush_context->estate = estate;
+					insert_modify_buffer_flush_context->mtstate = mtstate;
+				}
 
-			/* insert index entries for tuple */
-			if (resultRelInfo->ri_NumIndices > 0)
-				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-													   slot, estate, false,
-													   false, NULL, NIL,
-													   false);
+				if (table_modify_state == NULL)
+				{
+					table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc,
+															TM_FLAG_MULTI_INSERTS,
+															estate->es_output_cid,
+															0,
+															InsertModifyBufferFlushCallback,
+															insert_modify_buffer_flush_context);
+				}
+
+				table_modify_buffer_insert(table_modify_state, slot);
+				ar_insert_triggers_executed = true;
+			}
+			else
+			{
+				/* insert the tuple normally */
+				table_tuple_insert(resultRelationDesc, slot,
+								   estate->es_output_cid,
+								   0, NULL);
+
+				/* insert index entries for tuple */
+				if (resultRelInfo->ri_NumIndices > 0)
+					recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+														   slot, estate, false,
+														   false, NULL, NIL,
+														   false);
+
+				ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+									 mtstate->mt_transition_capture);
+
+				list_free(recheckIndexes);
+				ar_insert_triggers_executed = true;
+			}
 		}
 	}
 
@@ -1170,10 +1276,12 @@ ExecInsert(ModifyTableContext *context,
 	}
 
 	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
-
-	list_free(recheckIndexes);
+	if (!ar_insert_triggers_executed)
+	{
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+		list_free(recheckIndexes);
+	}
 
 	/*
 	 * Check any WITH CHECK OPTION constraints from parent views.  We are
@@ -1869,7 +1977,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
 	/* Tuple routing starts from the root table. */
 	context->cpUpdateReturningSlot =
 		ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag,
-				   inserted_tuple, insert_destrel);
+				   inserted_tuple, insert_destrel, false);
 
 	/*
 	 * Reset the transition state that may possibly have been written by
@@ -3364,7 +3472,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 				mtstate->mt_merge_action = action;
 
 				rslot = ExecInsert(context, mtstate->rootResultRelInfo,
-								   newslot, canSetTag, NULL, NULL);
+								   newslot, canSetTag, NULL, NULL, false);
 				mtstate->mt_merge_inserted += 1;
 				break;
 			case CMD_NOTHING:
@@ -3749,6 +3857,10 @@ ExecModifyTable(PlanState *pstate)
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
 	ItemPointer tupleid;
+	bool		canMultiInsert = false;
+
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -3844,6 +3956,10 @@ ExecModifyTable(PlanState *pstate)
 		if (TupIsNull(context.planSlot))
 			break;
 
+		if (operation == CMD_INSERT &&
+			nodeTag(subplanstate) == T_SeqScanState)
+			canMultiInsert = true;
+
 		/*
 		 * When there are multiple result relations, each tuple contains a
 		 * junk column that gives the OID of the rel from which it came.
@@ -4057,7 +4173,7 @@ ExecModifyTable(PlanState *pstate)
 					ExecInitInsertProjection(node, resultRelInfo);
 				slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot);
 				slot = ExecInsert(&context, resultRelInfo, slot,
-								  node->canSetTag, NULL, NULL);
+								  node->canSetTag, NULL, NULL, canMultiInsert);
 				break;
 
 			case CMD_UPDATE:
@@ -4116,6 +4232,17 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	if (table_modify_state != NULL)
+	{
+		Assert(operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Insert remaining tuples for batch insert.
 	 */
@@ -4228,6 +4355,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate->mt_merge_updated = 0;
 	mtstate->mt_merge_deleted = 0;
 
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
+
 	/*----------
 	 * Resolve the target relation. This is the same as:
 	 *
@@ -4681,6 +4811,17 @@ ExecEndModifyTable(ModifyTableState *node)
 {
 	int			i;
 
+	if (table_modify_state != NULL)
+	{
+		Assert(node->operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Allow any FDWs to shut down
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ebde07bcde..11c4d99430 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1226,6 +1226,7 @@ InjectionPointEntry
 InjectionPointSharedState
 InlineCodeBlock
 InProgressIO
+InsertModifyBufferFlushContext
 InsertStmt
 Instrumentation
 Int128AggState
-- 
2.34.1

From 4f4cf2f380a18c7a754b2fcd979af4617c6aff52 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 04:16:07 +0000
Subject: [PATCH v19 4/6] Optimize Logical Replication apply with multi inserts

---
 src/backend/executor/execReplication.c   |  39 +++
 src/backend/replication/logical/proto.c  |  24 ++
 src/backend/replication/logical/worker.c | 357 ++++++++++++++++++++++-
 src/include/executor/executor.h          |   4 +
 src/include/replication/logicalproto.h   |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 415 insertions(+), 13 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..fae1375537 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 	}
 }
 
+void
+ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+						ResultRelInfo *resultRelInfo,
+						EState *estate, TupleTableSlot *slot)
+{
+	bool		skip_tuple = false;
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+
+	/* For now we support only tables. */
+	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+	CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+	/* BEFORE ROW INSERT Triggers */
+	if (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+	{
+		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			skip_tuple = true;	/* "do nothing" */
+	}
+
+	if (!skip_tuple)
+	{
+		/* Compute stored generated columns */
+		if (rel->rd_att->constr &&
+			rel->rd_att->constr->has_generated_stored)
+			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+									   CMD_INSERT);
+
+		/* Check the constraints of the tuple */
+		if (rel->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate);
+		if (rel->rd_rel->relispartition)
+			ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+		table_modify_buffer_insert(MultiInsertState, slot);
+	}
+}
+
 /*
  * Find the searchslot tuple and update it with data in the slot,
  * update the indexes, and execute any constraints and per-row triggers.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..46d38aebd2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_tuple(out, rel, newslot, binary, columns);
 }
 
+LogicalRepRelId
+logicalrep_read_relid(StringInfo in)
+{
+	LogicalRepRelId relid;
+
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
+
+	return relid;
+}
+
+void
+logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup)
+{
+	char		action;
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	logicalrep_read_tuple(in, newtup);
+}
+
 /*
  * Read INSERT from stream.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..3440883847 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -148,7 +148,6 @@
 #include <unistd.h>
 
 #include "access/table.h"
-#include "access/tableam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
@@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+typedef enum LRMultiInsertReturnStatus
+{
+	LR_MULTI_INSERT_NONE = 0,
+	LR_MULTI_INSERT_REL_SKIPPED,
+	LR_MULTI_INSERT_DISALLOWED,
+	LR_MULTI_INSERT_DONE,
+} LRMultiInsertReturnStatus;
+
+static TableModifyState *MultiInsertState = NULL;
+static LogicalRepRelMapEntry *LastRel = NULL;
+static LogicalRepRelId LastMultiInsertRelId = InvalidOid;
+static ApplyExecutionData *LastEData = NULL;
+static TupleTableSlot *LastRemoteSlot = NULL;
+
+typedef struct LRModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} LRModifyBufferFlushContext;
+
+static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL;
+static void LRModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots);
+static void FinishMultiInserts(void);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit(s, &commit_data);
 
 	if (commit_data.commit_lsn != remote_final_lsn)
@@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
 
+	FinishMultiInserts();
+
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
 		ereport(ERROR,
@@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_prepare(s, &prepare_data);
 
 	if (prepare_data.prepare_lsn != remote_final_lsn)
@@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s)
 	LogicalRepCommitPreparedTxnData prepare_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
@@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
@@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
 								   prepare_data.xid, prepare_data.prepare_lsn);
 
+			FinishMultiInserts();
+
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
 
@@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s)
 static void
 apply_handle_origin(StringInfo s)
 {
+	FinishMultiInserts();
+
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
 	 * remote transaction and before any actual writes.
@@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s)
 	ParallelApplyWorkerInfo *winfo;
 	TransApplyAction apply_action;
 
+	FinishMultiInserts();
+
 	if (!in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s)
 	StringInfoData original_msg = *s;
 	bool		toplevel_xact;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
 								   commit_data.commit_lsn);
 
+			FinishMultiInserts();
+
 			apply_handle_commit_internal(&commit_data);
 
 			/* Unlink the files with serialized changes and subxact info. */
@@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
@@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s)
 {
 	LogicalRepTyp typ;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
 		return;
 
@@ -2363,16 +2416,132 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
-/*
- * Handle INSERT message.
- */
+static void
+FinishMultiInserts(void)
+{
+	LogicalRepMsgType saved_command;
+
+	if (MultiInsertState == NULL)
+		return;
+
+	Assert(OidIsValid(LastMultiInsertRelId));
+	Assert(LastEData != NULL);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	ExecDropSingleTupleTableSlot(LastRemoteSlot);
+	LastRemoteSlot = NULL;
+
+	table_modify_end(MultiInsertState);
+	MultiInsertState = NULL;
+	LastMultiInsertRelId = InvalidOid;
+
+	pfree(modify_buffer_flush_context);
+	modify_buffer_flush_context = NULL;
+
+	ExecCloseIndices(LastEData->targetRelInfo);
+
+	finish_edata(LastEData);
+	LastEData = NULL;
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+
+	logicalrep_rel_close(LastRel, NoLock);
+	LastRel = NULL;
+
+	end_replication_step();
+}
 
 static void
-apply_handle_insert(StringInfo s)
+LRModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots)
+{
+	LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	int			i;
+	LogicalRepMsgType saved_command;
+
+	if (nslots <= 0)
+		return;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	/* Caller must take care of opening and closing the indices */
+	for (i = 0; i < nslots; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  slots[i], estate, false,
+									  false, NULL, NIL, false);
+
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 NULL);
+
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL,
+								 NULL);
+		}
+
+		/*
+		 * XXX we should in theory pass a TransitionCaptureState object to the
+		 * above to capture transition tuples, but after statement triggers
+		 * don't actually get fired by replication yet anyway
+		 */
+	}
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+}
+
+static LRMultiInsertReturnStatus
+do_multi_inserts(StringInfo s, LogicalRepRelId *relid)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
-	LogicalRepRelId relid;
 	UserContext ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
@@ -2380,17 +2549,143 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	if (MultiInsertState == NULL)
+		begin_replication_step();
+
+	*relid = logicalrep_read_relid(s);
+
+	if (MultiInsertState != NULL &&
+		(LastMultiInsertRelId != InvalidOid &&
+		 *relid != InvalidOid &&
+		 LastMultiInsertRelId != *relid))
+		FinishMultiInserts();
+
+	if (MultiInsertState == NULL)
+		rel = logicalrep_rel_open(*relid, RowExclusiveLock);
+	else
+		rel = LastRel;
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_REL_SKIPPED;
+	}
+
+	/* For a partitioned table, let's not do multi inserts. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_DISALLOWED;
+	}
+
 	/*
-	 * Quick return if we are skipping data modification changes or handling
-	 * streamed transactions.
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
 	 */
-	if (is_skipping_changes() ||
-		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
-		return;
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	if (MultiInsertState == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/* Initialize the executor state. */
+		LastEData = edata = create_edata_for_relation(rel);
+		estate = edata->estate;
+
+		LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel),
+														 &TTSOpsVirtual);
+
+		modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext));
+		modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo;
+		modify_buffer_flush_context->estate = estate;
+
+		MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc,
+											  TM_FLAG_MULTI_INSERTS |
+											  TM_FLAG_BAS_BULKWRITE,
+											  GetCurrentCommandId(true),
+											  0,
+											  LRModifyBufferFlushCallback,
+											  modify_buffer_flush_context);
+		LastRel = rel;
+		LastMultiInsertRelId = *relid;
+
+		/* We must open indexes here. */
+		ExecOpenIndices(edata->targetRelInfo, false);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		CommandId	cid;
+
+		edata = LastEData;
+		estate = edata->estate;
+		ResetExprContext(GetPerTupleExprContext(estate));
+		ExecClearTuple(LastRemoteSlot);
+		remoteslot = LastRemoteSlot;
+		cid = GetCurrentCommandId(true);
+		MultiInsertState->cid = cid;
+		estate->es_output_cid = cid;
+	}
+
+	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT);
+	ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot);
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	Assert(MultiInsertState != NULL);
+
+	CommandCounterIncrement();
+
+	return LR_MULTI_INSERT_DONE;
+}
+
+static bool
+do_single_inserts(StringInfo s, LogicalRepRelId relid)
+{
+	LogicalRepRelMapEntry *rel;
+	LogicalRepTupleData newtup;
+	UserContext ucxt;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+
+	Assert(relid != InvalidOid);
 
 	begin_replication_step();
 
-	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
 	if (!should_apply_changes_for_rel(rel))
 	{
@@ -2400,7 +2695,7 @@ apply_handle_insert(StringInfo s)
 		 */
 		logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
-		return;
+		return false;
 	}
 
 	/*
@@ -2422,6 +2717,7 @@ apply_handle_insert(StringInfo s)
 										&TTSOpsVirtual);
 
 	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
@@ -2446,6 +2742,35 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
+
+	return true;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+	LRMultiInsertReturnStatus mi_status;
+	LogicalRepRelId relid;
+
+	/*
+	 * Quick return if we are skipping data modification changes or handling
+	 * streamed transactions.
+	 */
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	mi_status = do_multi_inserts(s, &relid);
+	if (mi_status == LR_MULTI_INSERT_REL_SKIPPED ||
+		mi_status == LR_MULTI_INSERT_DONE)
+		return;
+
+	do_single_inserts(s, relid);
+
+	return;
 }
 
 /*
@@ -2532,6 +2857,8 @@ apply_handle_update(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -2713,6 +3040,8 @@ apply_handle_delete(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3154,6 +3483,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 9770752ea3..8f10ea977b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "access/tableam.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
+extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+									ResultRelInfo *resultRelInfo,
+									EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 									 EState *estate, EPQState *epqstate,
 									 TupleTableSlot *searchslot, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..3f3a7f0a31 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
 									bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_relid(StringInfo in);
+extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 11c4d99430..70f23808e2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1456,6 +1456,8 @@ LPTHREAD_START_ROUTINE
 LPTSTR
 LPVOID
 LPWSTR
+LRModifyBufferFlushContext
+LRMultiInsertReturnStatus
 LSEG
 LUID
 LVRelState
-- 
2.34.1

From d83bf5bc0bfc5f45d85e38a876a7db94f12803da Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 05:18:58 +0000
Subject: [PATCH v19 5/6] Use new multi insert Table AM for COPY FROM

---
 src/backend/commands/copyfrom.c          | 230 +++++++++++++++--------
 src/include/commands/copyfrom_internal.h |   4 +-
 src/tools/pgindent/typedefs.list         |   1 +
 3 files changed, 153 insertions(+), 82 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce4d62e707..8572c5b730 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -71,13 +71,21 @@
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+typedef struct CopyModifyBufferFlushContext
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} CopyModifyBufferFlushContext;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *multislot;
+	CopyModifyBufferFlushContext *modify_buffer_flush_context;
 	int			nused;			/* number of 'slots' containing tuples */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
@@ -99,6 +107,7 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
+static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots);
 
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
@@ -218,14 +227,38 @@ CopyLimitPrintoutLength(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext));
+		buffer->modify_buffer_flush_context->cstate = cstate;
+		buffer->modify_buffer_flush_context->resultRelInfo = rri;
+		buffer->modify_buffer_flush_context->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											TM_FLAG_MULTI_INSERTS |
+											TM_FLAG_BAS_BULKWRITE,
+											miinfo->mycid,
+											miinfo->ti_options,
+											CopyModifyBufferFlushCallback,
+											buffer->modify_buffer_flush_context);
+		buffer->slots = NULL;
+		buffer->multislot = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->multislot = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -236,11 +269,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -273,7 +307,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -317,8 +351,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -390,13 +422,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -404,56 +431,12 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
+		table_modify_buffer_flush(buffer->mstate);
+
 		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
 		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
-
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
-
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
-
-			ExecClearTuple(slots[i]);
-		}
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -469,6 +452,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	buffer->nused = 0;
 }
 
+static void
+CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots)
+{
+	CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context;
+	CopyFromState cstate = ctx->cstate;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+	int			i;
+
+	if (nslots <= 0)
+		return;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+	for (i = 0; i < nslots; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			cstate->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  slots[i], estate, false,
+									  false, NULL, NIL, false);
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 cstate->transition_capture);
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			cstate->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL,
+								 cstate->transition_capture);
+		}
+	}
+}
+
 /*
  * Drop used slots and free member for this buffer.
  *
@@ -489,19 +530,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->multislot);
+		pfree(buffer->modify_buffer_flush_context);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -588,13 +628,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused = buffer->nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(nused < MAX_BUFFERED_TUPLES);
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->multislot == NULL)
+			buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												   &TTSOpsVirtual);
+
+		/* Caller must clear the slot */
+		slot = buffer->multislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -608,7 +667,17 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->multislot);
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -830,7 +899,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -1080,7 +1149,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 70f23808e2..bd8c87be33 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -486,6 +486,7 @@ CopyHeaderChoice
 CopyInsertMethod
 CopyMethod
 CopyLogVerbosityChoice
+CopyModifyBufferFlushContext
 CopyMultiInsertBuffer
 CopyMultiInsertInfo
 CopyOnErrorChoice
-- 
2.34.1

From 1b5d3c04f21e764756f89f0456d0f96e2b2350de Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 23 Apr 2024 05:25:21 +0000
Subject: [PATCH v19 6/6] Remove table_multi_insert and
 table_finish_bulk_insert

---
 src/backend/access/heap/heapam_handler.c |  1 -
 src/backend/access/table/tableamapi.c    |  1 -
 src/backend/commands/tablecmds.c         |  4 --
 src/include/access/tableam.h             | 56 +-----------------------
 4 files changed, 1 insertion(+), 61 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index eda0c73a16..fe9701773a 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2614,7 +2614,6 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert = heapam_tuple_insert,
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
-	.multi_insert = heap_multi_insert,
 
 	.tuple_modify_begin = heap_modify_begin,
 	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 96ac951af6..0af8f1ac1f 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -71,7 +71,6 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->tuple_insert_speculative != NULL);
 	Assert(routine->tuple_complete_speculative != NULL);
 
-	Assert(routine->multi_insert != NULL);
 	Assert(routine->tuple_delete != NULL);
 	Assert(routine->tuple_update != NULL);
 	Assert(routine->tuple_lock != NULL);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 0c984aa656..22bcb12abb 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -20969,8 +20969,6 @@ deleteSplitPartitionContext(SplitPartitionContext *pc, int ti_options)
 	ExecDropSingleTupleTableSlot(pc->dstslot);
 	FreeBulkInsertState(pc->bistate);
 
-	table_finish_bulk_insert(pc->partRel, ti_options);
-
 	pfree(pc);
 }
 
@@ -21453,8 +21451,6 @@ moveMergedTablesRows(Relation rel, List *mergingPartitionsList,
 
 	ExecDropSingleTupleTableSlot(dstslot);
 	FreeBulkInsertState(bistate);
-
-	table_finish_bulk_insert(newPartRel, ti_options);
 }
 
 /*
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index ddb6e6f3a5..82798fd641 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -563,10 +563,6 @@ typedef struct TableAmRoutine
 											   uint32 specToken,
 											   bool succeeded);
 
-	/* see table_multi_insert() for reference about parameters */
-	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
-								 CommandId cid, int options, struct BulkInsertStateData *bistate);
-
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -600,21 +596,6 @@ typedef struct TableAmRoutine
 							   uint8 flags,
 							   TM_FailureData *tmfd);
 
-	/*
-	 * Perform operations necessary to complete insertions made via
-	 * tuple_insert and multi_insert with a BulkInsertState specified. In-tree
-	 * access methods ceased to use this.
-	 *
-	 * Typically callers of tuple_insert and multi_insert will just pass all
-	 * the flags that apply to them, and each AM has to decide which of them
-	 * make sense for it, and then only take actions in finish_bulk_insert for
-	 * those flags, and ignore others.
-	 *
-	 * Optional callback.
-	 */
-	void		(*finish_bulk_insert) (Relation rel, int options);
-
-
 	/* ------------------------------------------------------------------------
 	 * Table Modify related functions.
 	 * ------------------------------------------------------------------------
@@ -1453,8 +1434,7 @@ table_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate)
  * heap's TOAST table, too, if the tuple requires any out-of-line data.
  *
  * The BulkInsertState object (if any; bistate can be NULL for default
- * behavior) is also just passed through to RelationGetBufferForTuple. If
- * `bistate` is provided, table_finish_bulk_insert() needs to be called.
+ * behavior) is also just passed through to RelationGetBufferForTuple.
  *
  * On return the slot's tts_tid and tts_tableOid are updated to reflect the
  * insertion. But note that any toasting of fields within the slot is NOT
@@ -1501,28 +1481,6 @@ table_tuple_complete_speculative(Relation rel, TupleTableSlot *slot,
 												succeeded);
 }
 
-/*
- * Insert multiple tuples into a table.
- *
- * This is like table_tuple_insert(), but inserts multiple tuples in one
- * operation. That's often faster than calling table_tuple_insert() in a loop,
- * because e.g. the AM can reduce WAL logging and page locking overhead.
- *
- * Except for taking `nslots` tuples as input, and an array of TupleTableSlots
- * in `slots`, the parameters for table_multi_insert() are the same as for
- * table_tuple_insert().
- *
- * Note: this leaks memory into the current memory context. You can create a
- * temporary context before calling this, if that's a problem.
- */
-static inline void
-table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
-				   CommandId cid, int options, struct BulkInsertStateData *bistate)
-{
-	rel->rd_tableam->multi_insert(rel, slots, nslots,
-								  cid, options, bistate);
-}
-
 /*
  * Delete a tuple.
  *
@@ -1649,18 +1607,6 @@ table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot,
 									   flags, tmfd);
 }
 
-/*
- * Perform operations necessary to complete insertions made via
- * tuple_insert and multi_insert with a BulkInsertState specified.
- */
-static inline void
-table_finish_bulk_insert(Relation rel, int options)
-{
-	/* optional callback */
-	if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
-		rel->rd_tableam->finish_bulk_insert(rel, options);
-}
-
 /* ------------------------------------------------------------------------
  * Table Modify related functions.
  * ------------------------------------------------------------------------
-- 
2.34.1

Reply via email to