On Wed, Apr 3, 2024 at 1:10 AM Jeff Davis <pg...@j-davis.com> wrote: > > Here's where I think this API should go: > > 1. Have table_modify_begin/end and table_modify_buffer_insert, like > those that are implemented in your patch.
I added table_modify_begin, table_modify_buffer_insert, table_modify_buffer_flush and table_modify_end. Table Access Method (AM) authors now can define their own buffering strategy and flushing decisions based on their tuple storage kinds and various other AM specific factors. I also added a default implementation that falls back to single inserts when no implementation is provided for these AM by AM authors. See the attached v19-0001 patch. > 2. Add some kind of flush callback that will be called either while the > tuples are being flushed or after the tuples are flushed (but before > they are freed by the AM). (Aside: do we need to call it while the > tuples are being flushed to get the right visibility semantics for > after-row triggers?) I added a flush callback named TableModifyBufferFlushCallback; when provided by callers invoked after tuples are flushed to disk from the buffers but before the AM frees them up. Index insertions and AFTER ROW INSERT triggers can be executed in this callback. See the v19-0001 patch for how AM invokes the flush callback, and see either v19-0003 or v19-0004 or v19-0005 for how a caller can supply the callback and required context to execute index insertions and AR triggers. > 3. Add table_modify_buffer_{update|delete} APIs. > > 9. Use these new methods for DELETE, UPDATE, and MERGE. MERGE can use > the buffer_insert/update/delete APIs; we don't need a separate merge > method. This probably requires that the AM maintain 3 separate buffers > to distinguish different kinds of changes at flush time (obviously > these can be initialized lazily to avoid overhead when not being used). I haven't thought about these things yet. I can only focus on them after seeing how the attached patches go from here. > 4. Some kind of API tweaks to help manage memory when modifying > pertitioned tables, so that the buffering doesn't get out of control. > Perhaps just reporting memory usage and allowing the caller to force > flushes would be enough. Heap implementation for thes new Table AMs uses a separate memory context for all of the operations. Please have a look and let me know if we need anything more. > 5. Use these new methods for CREATE/REFRESH MATERIALIZED VIEW. This is > fairly straightforward, I believe, and handled by your patch. Indexes > are (re)built afterward, and no triggers are possible. > > 6. Use these new methods for CREATE TABLE ... AS. This is fairly > straightforward, I believe, and handled by your patch. No indexes or > triggers are possible. I used multi inserts for all of these including TABLE REWRITE commands such as ALTER TABLE. See the attached v19-0002 patch. Check the testing section below for benefits. FWIW, following are some of the TABLE REWRITE commands that can get benefitted: ALTER TABLE tbl ALTER c1 TYPE bigint; ALTER TABLE itest13 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY; ALTER MATERIALIZED VIEW heapmv SET ACCESS METHOD heap2; ALTER TABLE itest3 ALTER COLUMN a TYPE int; ALTER TABLE gtest20 ALTER COLUMN b SET EXPRESSION AS (a * 3); ALTER TABLE has_volatile ADD col4 int DEFAULT (random() * 10000)::int; and so on. > 7. Use these new methods for COPY. We have to be careful to avoid > regressions for the heap method, because it's already managing its own > buffers. If the AM manages the buffering, then it may require > additional copying of slots, which could be a disadvantage. To solve > this, we may need some minor API tweaks to avoid copying when the > caller guarantees that the memory will not be freed to early, or > perhaps expose the AM's memory context to copyfrom.c. Another thing to > consider is that the buffering in copyfrom.c is also used for FDWs, so > that buffering code path needs to be preserved in copyfrom.c even if > not used for AMs. I modified the COPY FROM code to use the new Table AMs, and performed some tests which show no signs of regression. Check the testing section below for more details. See the attached v19-0005 patch. With this, table_multi_insert can be deprecated. > 8. Use these new methods for INSERT INTO ... SELECT. One potential > challenge here is that execution nodes are not always run to > completion, so we need to be sure that the flush isn't forgotten in > that case. I did that in v19-0003. I did place the table_modify_end call in multiple places including ExecEndModifyTable. I didn't find any issues with it. Please have a look and let me know if we need the end call in more places. Check the testing section below for benefits. > 10. Use these new methods for logical apply. I used multi inserts for Logical Replication apply. in v19-0004. Check the testing section below for benefits. FWIW, open-source pglogical does have multi insert support, check code around https://github.com/2ndQuadrant/pglogical/blob/REL2_x_STABLE/pglogical_apply_heap.c#L960 . > 11. Deprecate the multi_insert API. I did remove both table_multi_insert and table_finish_bulk_insert in v19-0006. Perhaps, removing them isn't a great idea, but adding a deprecation WARNING/ERROR until some more PG releases might be worth looking at. > Thoughts on this plan? Does your patch make sense in v17 as a stepping > stone, or should we try to make all of these API changes together in > v18? If the design, code and benefits that these new Table AMs bring to the table look good, I hope to see it for PG 18. > Also, a sample AM code would be a huge benefit here. Writing a real AM > is hard, but perhaps we can at least have an example one to demonstrate > how to use these APIs? The attached patches already have implemented these new Table AMs for Heap. I don't think we need a separate implementation to demonstrate. If others feel so, I'm open to thoughts here. Having said above, I'd like to reiterate the motivation behind the new Table AMs for multi and single inserts. 1. A scan-like API with state being carried across is thought to be better as suggested by Andres Freund - https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3f...@alap3.anarazel.de . 2. Allowing a Table AM to optimize operations across multiple inserts, define its own buffering strategy and take its own flushing decisions based on their tuple storage kinds and various other AM specific factors. 3. Improve performance of various SQL commands with multi inserts for Heap AM. The attached v19 patches might need some more detailed comments, some documentation and some specific tests ensuring the multi inserts for Heap are kicked-in for various commands. I'm open to thoughts here. I did some testing to see how various commands benefit with multi inserts using these new Table AM for heap. It's not only the improvement in performance these commands see, but also the amount of WAL that gets generated reduces greatly. After all, multi inserts optimize the insertions by writing less WAL. IOW, writing WAL record per page if multiple rows fit into a single data page as opposed to WAL record per row. Test case 1: 100 million rows, 2 columns (int and float) Command | HEAD (sec) | PATCHED (sec) | Faster by % | Faster by X ------------------------------ | ---------- | ------------- | ----------- | ----------- CREATE TABLE AS | 121 | 77 | 36.3 | 1.57 CREATE MATERIALIZED VIEW | 101 | 49 | 51.4 | 2.06 REFRESH MATERIALIZED VIEW | 113 | 54 | 52.2 | 2.09 ALTER TABLE (TABLE REWRITE) | 124 | 81 | 34.6 | 1.53 COPY FROM | 71 | 72 | 0 | 1 INSERT INTO ... SELECT | 117 | 62 | 47 | 1.88 LOGICAL REPLICATION APPLY | 393 | 306 | 22.1 | 1.28 Command | HEAD (WAL in GB) | PATCHED (WAL in GB) | Reduced by % | Reduced by X ------------------------------ | ---------------- | ------------------- | ------------ | ----------- CREATE TABLE AS | 6.8 | 2.4 | 64.7 | 2.83 CREATE MATERIALIZED VIEW | 7.2 | 2.3 | 68 | 3.13 REFRESH MATERIALIZED VIEW | 10 | 5.1 | 49 | 1.96 ALTER TABLE (TABLE REWRITE) | 8 | 3.2 | 60 | 2.5 COPY FROM | 2.9 | 3 | 0 | 1 INSERT INTO ... SELECT | 8 | 3 | 62.5 | 2.66 LOGICAL REPLICATION APPLY | 7.5 | 2.3 | 69.3 | 3.26 Test case 2: 1 billion rows, 1 column (int) Command | HEAD (sec) | PATCHED (sec) | Faster by % | Faster by X ------------------------------ | ---------- | ------------- | ----------- | ----------- CREATE TABLE AS | 794 | 386 | 51.38 | 2.05 CREATE MATERIALIZED VIEW | 1006 | 563 | 44.03 | 1.78 REFRESH MATERIALIZED VIEW | 977 | 603 | 38.28 | 1.62 ALTER TABLE (TABLE REWRITE) | 1189 | 714 | 39.94 | 1.66 COPY FROM | 321 | 330 | -0.02 | 0.97 INSERT INTO ... SELECT | 1084 | 586 | 45.94 | 1.84 LOGICAL REPLICATION APPLY | 3530 | 2982 | 15.52 | 1.18 Command | HEAD (WAL in GB) | PATCHED (WAL in GB) | Reduced by % | Reduced by X ------------------------------ | ---------------- | ------------------- | ------------ | ----------- CREATE TABLE AS | 60 | 12 | 80 | 5 CREATE MATERIALIZED VIEW | 60 | 12 | 80 | 5 REFRESH MATERIALIZED VIEW | 60 | 12 | 80 | 5 ALTER TABLE (TABLE REWRITE) | 123 | 31 | 60 | 2.5 COPY FROM | 12 | 12 | 0 | 1 INSERT INTO ... SELECT | 120 | 24 | 80 | 5 LOGICAL REPLICATION APPLY | 61 | 12 | 80.32 | 5 Test setup: ./configure --prefix=$PWD/pg17/ --enable-tap-tests CFLAGS="-ggdb3 -O2" > install.log && make -j 8 install > install.log 2>&1 & wal_level=logical max_wal_size = 256GB checkpoint_timeout = 1h Test system is EC2 instance of type c5.4xlarge: Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 46 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Vendor ID: GenuineIntel Model name: Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz CPU family: 6 Model: 85 Thread(s) per core: 2 Core(s) per socket: 8 Socket(s): 1 Stepping: 7 BogoMIPS: 5999.99 Caches (sum of all): L1d: 256 KiB (8 instances) L1i: 256 KiB (8 instances) L2: 8 MiB (8 instances) L3: 35.8 MiB (1 instance) NUMA: NUMA node(s): 1 NUMA node0 CPU(s): 0-15 RAM: MemTotal: 32036536 kB -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 75666da998aaa8fbc60d62ad8c160a5c227065e6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 04:12:29 +0000 Subject: [PATCH v19 1/6] Introduce new Table Access Methods for single and multi inserts --- src/backend/access/heap/heapam.c | 202 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 6 + src/backend/access/table/tableam.c | 95 +++++++++++ src/backend/access/table/tableamapi.c | 10 ++ src/include/access/heapam.h | 44 +++++ src/include/access/tableam.h | 146 ++++++++++++++++ src/tools/pgindent/typedefs.list | 3 + 7 files changed, 505 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4a4cf76269..37c6ed232c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -112,7 +113,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); - +static void heap_modify_insert_end_callback(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2608,6 +2609,205 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(TopTransactionContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mem_cxt = context; + state->cid = cid; + state->options = options; + state->modify_buffer_flush_callback = modify_buffer_flush_callback; + state->modify_buffer_flush_context = modify_buffer_flush_context; + state->modify_end_callback = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mem_cxt); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + + if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0) + { + mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + istate->mistate = mistate; + mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert memory context", + ALLOCSET_DEFAULT_SIZES); + } + + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + + state->modify_end_callback = heap_modify_insert_end_callback; + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + dstslot = mistate->slots[mistate->cur_slots]; + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + /* + * Memory allocated for the whole tuple is in slot's memory context, so + * use it keep track of the total space occupied by all buffered tuples. + */ + if (TTS_SHOULDFREE(dstslot)) + mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + if (mistate->cur_slots == 0) + return; + + /* + * heap_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(mistate->mem_cxt); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->options, istate->bistate); + + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->mem_cxt); + + if (state->modify_buffer_flush_callback != NULL) + state->modify_buffer_flush_callback(state->modify_buffer_flush_context, + mistate->slots, mistate->cur_slots); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Heap insert specific callback used for performing work at the end like + * flushing buffered tuples if any, cleaning up the insert state and buffered + * slots. + */ +static void +heap_modify_insert_end_callback(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->cur_slots == 0 && + mistate->cur_size == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + MemoryContextDelete(mistate->mem_cxt); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + if (state->modify_end_callback != NULL) + state->modify_end_callback(state); + + MemoryContextDelete(state->mem_cxt); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6f8b1b7929..eda0c73a16 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2615,6 +2615,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index e57a0b7ea3..0e4ce1aca6 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -21,6 +21,7 @@ #include <math.h> +#include "access/heapam.h" /* just for BulkInsertState */ #include "access/syncscan.h" #include "access/tableam.h" #include "access/xact.h" @@ -29,6 +30,7 @@ #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "utils/memutils.h" /* * Constants to control the behavior of block allocation to parallel workers @@ -48,6 +50,7 @@ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; bool synchronize_seqscans = true; +static void default_table_modify_insert_end_callback(TableModifyState *state); /* ---------------------------------------------------------------------------- * Slot functions. @@ -756,3 +759,95 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths, else *allvisfrac = (double) relallvisible / curpages; } + +/* + * Initialize default table modify state. + */ +TableModifyState * +default_table_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "default_table_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mem_cxt = context; + state->cid = cid; + state->options = options; + state->modify_end_callback = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Default table modify implementation for inserts. + */ +void +default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mem_cxt); + + /* First time through, initialize default table modify state */ + if (state->data == NULL) + { + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + state->data = (BulkInsertState) GetBulkInsertState(); + + state->modify_end_callback = default_table_modify_insert_end_callback; + } + + /* Fallback to table AM single insert routine */ + table_tuple_insert(state->rel, + slot, + state->cid, + state->options, + (BulkInsertState) state->data); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Default table modify implementation for flush. + */ +void +default_table_modify_buffer_flush(TableModifyState *state) +{ + /* no-op */ +} + +/* + * Default table modify insert specific callback used for performing work at + * the end like cleaning up the bulk insert state. + */ +static void +default_table_modify_insert_end_callback(TableModifyState *state) +{ + if (state->data != NULL) + FreeBulkInsertState((BulkInsertState) state->data); +} + +/* + * Clean default table modify state. + */ +void +default_table_modify_end(TableModifyState *state) +{ + if (state->modify_end_callback != NULL) + state->modify_end_callback(state); + + MemoryContextDelete(state->mem_cxt); +} diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index ce637a5a5d..96ac951af6 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -97,6 +97,16 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + /* optional, but either all of them are defined or none. */ + Assert((routine->tuple_modify_begin == NULL && + routine->tuple_modify_buffer_insert == NULL && + routine->tuple_modify_buffer_flush == NULL && + routine->tuple_modify_end == NULL) || + (routine->tuple_modify_begin != NULL && + routine->tuple_modify_buffer_insert != NULL && + routine->tuple_modify_buffer_flush != NULL && + routine->tuple_modify_end != NULL)); + return routine; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index c47a5045ce..c10ebbb5ea 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -271,6 +271,38 @@ typedef enum PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */ } PruneReason; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Approximate size of all tuples currently held in buffered slots */ + Size cur_size; + + MemoryContext mem_cxt; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -321,6 +353,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(Relation rel, + int modify_flags, + CommandId cid, + int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void heap_modify_buffer_flush(TableModifyState *state); +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8e583b45cd..ddb6e6f3a5 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -255,6 +255,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Table modify flags */ + +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TM_FLAG_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TM_FLAG_BAS_BULKWRITE 0x000002 + +struct TableModifyState; + +/* Callback invoked for each tuple that gets flushed to disk from buffer */ +typedef void (*TableModifyBufferFlushCallback) (void *context, + TupleTableSlot **slots, + int nslots); + +/* Table AM specific callback that gets called in table_modify_end() */ +typedef void (*TableModifyEndCallback) (struct TableModifyState *state); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + Relation rel; + int modify_flags; + MemoryContext mem_cxt; + CommandId cid; + int options; + + /* Flush callback and its context */ + TableModifyBufferFlushCallback modify_buffer_flush_callback; + void *modify_buffer_flush_context; + + /* Table AM specific data */ + void *data; + + TableModifyEndCallback modify_end_callback; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -578,6 +615,21 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + int modify_flags, + CommandId cid, + int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_buffer_flush) (TableModifyState *state); + void (*tuple_modify_end) (TableModifyState *state); + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1609,6 +1661,100 @@ table_finish_bulk_insert(Relation rel, int options) rel->rd_tableam->finish_bulk_insert(rel, options); } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context); +extern void default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void default_table_modify_buffer_flush(TableModifyState *state); +extern void default_table_modify_end(TableModifyState *state); + +static inline TableModifyState * +table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options, + TableModifyBufferFlushCallback modify_buffer_flush_callback, + void *modify_buffer_flush_context) +{ + if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin != NULL) + { + return rel->rd_tableam->tuple_modify_begin(rel, modify_flags, + cid, options, + modify_buffer_flush_callback, + modify_buffer_flush_context); + } + else if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin == NULL) + { + /* Fallback to a default implementation */ + return default_table_modify_begin(rel, modify_flags, + cid, options, + modify_buffer_flush_callback, + modify_buffer_flush_context); + } + else + Assert(false); + + return NULL; +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert != NULL) + { + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_buffer_insert(state, slot); + } + else + Assert(false); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_flush != NULL) + { + state->rel->rd_tableam->tuple_modify_buffer_flush(state); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_flush == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_buffer_flush(state); + } + else + Assert(false); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end != NULL) + { + state->rel->rd_tableam->tuple_modify_end(state); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_end(state); + } + else + Assert(false); +} /* ------------------------------------------------------------------------ * DDL related functionality. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d551ada325..ebde07bcde 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1130,6 +1130,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2844,6 +2846,7 @@ TableFuncScanState TableFuncType TableInfo TableLikeClause +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.34.1
From 5a6dd7ac0cae831fbd8294710997dd484c089fcb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 04:12:58 +0000 Subject: [PATCH v19 2/6] Optimize CTAS, CMV, RMV and TABLE REWRITES with multi inserts --- src/backend/commands/createas.c | 27 +++++++++++---------------- src/backend/commands/matview.c | 26 +++++++++++--------------- src/backend/commands/tablecmds.c | 31 +++++++++++-------------------- 3 files changed, 33 insertions(+), 51 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 62050f4dc5..2d6fffbf07 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -53,9 +53,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -552,17 +550,21 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(intoRelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM, + NULL, + NULL); else - myState->bistate = NULL; + myState->mstate = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -590,11 +592,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_modify_buffer_insert(myState->mstate, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -612,10 +610,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_modify_end(myState->mstate); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 6d09b75556..bb97e2fa5f 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -48,9 +48,7 @@ typedef struct Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -458,9 +456,14 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * Fill private fields of myState for use by later routines */ myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(transientrel, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN, + NULL, + NULL); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -485,12 +488,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_modify_buffer_insert(myState->mstate, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -505,9 +503,7 @@ transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_modify_end(myState->mstate); /* close transientrel, but keep lock until commit */ table_close(myState->transientrel, NoLock); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 3556240c8e..0c984aa656 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -6060,10 +6060,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) int i; ListCell *l; EState *estate; - CommandId mycid; - BulkInsertState bistate; - int ti_options; ExprState *partqualstate = NULL; + TableModifyState *mstate = NULL; /* * Open the relation(s). We have surely already locked the existing @@ -6082,18 +6080,15 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) * Prepare a BulkInsertState and options for table_tuple_insert. The FSM * is empty, so don't bother using it. */ - if (newrel) + if (newrel && mstate == NULL) { - mycid = GetCurrentCommandId(true); - bistate = GetBulkInsertState(); - ti_options = TABLE_INSERT_SKIP_FSM; - } - else - { - /* keep compiler quiet about using these uninitialized */ - mycid = 0; - bistate = NULL; - ti_options = 0; + mstate = table_modify_begin(newrel, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM, + NULL, + NULL); } /* @@ -6392,8 +6387,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) /* Write the tuple out to the new relation */ if (newrel) - table_tuple_insert(newrel, insertslot, mycid, - ti_options, bistate); + table_modify_buffer_insert(mstate, insertslot); ResetExprContext(econtext); @@ -6414,10 +6408,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) table_close(oldrel, NoLock); if (newrel) { - FreeBulkInsertState(bistate); - - table_finish_bulk_insert(newrel, ti_options); - + table_modify_end(mstate); table_close(newrel, NoLock); } } -- 2.34.1
From d3f0c64e85417e6fcf164656481ea80732b9bd87 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 04:15:49 +0000 Subject: [PATCH v19 3/6] Optimize INSERT INTO ... SELECT with multi inserts --- contrib/test_decoding/expected/stream.out | 2 +- src/backend/executor/nodeModifyTable.c | 177 +++++++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 161 insertions(+), 19 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 4ab2d47bf8..c19facb3c9 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -101,10 +101,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction streaming change for transaction streaming change for transaction - streaming change for transaction closing a streamed block for transaction opening a streamed block for transaction streaming change for transaction + streaming change for transaction closing a streamed block for transaction committing streamed transaction (17 rows) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index cee60d3659..434e3f8411 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -114,6 +114,19 @@ typedef struct UpdateContext LockTupleMode lockmode; } UpdateContext; +typedef struct InsertModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; + ModifyTableState *mtstate; +} InsertModifyBufferFlushContext; + +static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL; +static TableModifyState *table_modify_state = NULL; + +static void InsertModifyBufferFlushCallback(void *context, + TupleTableSlot **slots, + int nslots); static void ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, @@ -726,6 +739,61 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, return ExecProject(newProj); } +static void +InsertModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots) +{ + InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + ModifyTableState *mtstate = ctx->mtstate; + int i; + + if (nslots <= 0) + return; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Caller must take care of opening and closing the indices */ + for (i = 0; i < nslots; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slots[i], estate, false, + false, NULL, NIL, false); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + mtstate->mt_transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + mtstate->mt_transition_capture); + } + } +} + /* ---------------------------------------------------------------- * ExecInsert * @@ -751,7 +819,8 @@ ExecInsert(ModifyTableContext *context, TupleTableSlot *slot, bool canSetTag, TupleTableSlot **inserted_tuple, - ResultRelInfo **insert_destrel) + ResultRelInfo **insert_destrel, + bool canMultiInsert) { ModifyTableState *mtstate = context->mtstate; EState *estate = context->estate; @@ -764,6 +833,7 @@ ExecInsert(ModifyTableContext *context, OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; MemoryContext oldContext; + bool ar_insert_triggers_executed = false; /* * If the input result relation is a partitioned table, find the leaf @@ -1126,17 +1196,53 @@ ExecInsert(ModifyTableContext *context, } else { - /* insert the tuple normally */ - table_tuple_insert(resultRelationDesc, slot, - estate->es_output_cid, - 0, NULL); + if (canMultiInsert && + proute == NULL && + resultRelInfo->ri_WithCheckOptions == NIL && + resultRelInfo->ri_projectReturning == NULL) + { + if (insert_modify_buffer_flush_context == NULL) + { + insert_modify_buffer_flush_context = + (InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext)); + insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo; + insert_modify_buffer_flush_context->estate = estate; + insert_modify_buffer_flush_context->mtstate = mtstate; + } - /* insert index entries for tuple */ - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, - false, NULL, NIL, - false); + if (table_modify_state == NULL) + { + table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS, + estate->es_output_cid, + 0, + InsertModifyBufferFlushCallback, + insert_modify_buffer_flush_context); + } + + table_modify_buffer_insert(table_modify_state, slot); + ar_insert_triggers_executed = true; + } + else + { + /* insert the tuple normally */ + table_tuple_insert(resultRelationDesc, slot, + estate->es_output_cid, + 0, NULL); + + /* insert index entries for tuple */ + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, + false); + + ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, + mtstate->mt_transition_capture); + + list_free(recheckIndexes); + ar_insert_triggers_executed = true; + } } } @@ -1170,10 +1276,12 @@ ExecInsert(ModifyTableContext *context, } /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, - ar_insert_trig_tcs); - - list_free(recheckIndexes); + if (!ar_insert_triggers_executed) + { + ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, + ar_insert_trig_tcs); + list_free(recheckIndexes); + } /* * Check any WITH CHECK OPTION constraints from parent views. We are @@ -1869,7 +1977,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, /* Tuple routing starts from the root table. */ context->cpUpdateReturningSlot = ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag, - inserted_tuple, insert_destrel); + inserted_tuple, insert_destrel, false); /* * Reset the transition state that may possibly have been written by @@ -3364,7 +3472,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, mtstate->mt_merge_action = action; rslot = ExecInsert(context, mtstate->rootResultRelInfo, - newslot, canSetTag, NULL, NULL); + newslot, canSetTag, NULL, NULL, false); mtstate->mt_merge_inserted += 1; break; case CMD_NOTHING: @@ -3749,6 +3857,10 @@ ExecModifyTable(PlanState *pstate) HeapTupleData oldtupdata; HeapTuple oldtuple; ItemPointer tupleid; + bool canMultiInsert = false; + + table_modify_state = NULL; + insert_modify_buffer_flush_context = NULL; CHECK_FOR_INTERRUPTS(); @@ -3844,6 +3956,10 @@ ExecModifyTable(PlanState *pstate) if (TupIsNull(context.planSlot)) break; + if (operation == CMD_INSERT && + nodeTag(subplanstate) == T_SeqScanState) + canMultiInsert = true; + /* * When there are multiple result relations, each tuple contains a * junk column that gives the OID of the rel from which it came. @@ -4057,7 +4173,7 @@ ExecModifyTable(PlanState *pstate) ExecInitInsertProjection(node, resultRelInfo); slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot); slot = ExecInsert(&context, resultRelInfo, slot, - node->canSetTag, NULL, NULL); + node->canSetTag, NULL, NULL, canMultiInsert); break; case CMD_UPDATE: @@ -4116,6 +4232,17 @@ ExecModifyTable(PlanState *pstate) return slot; } + if (table_modify_state != NULL) + { + Assert(operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Insert remaining tuples for batch insert. */ @@ -4228,6 +4355,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_merge_updated = 0; mtstate->mt_merge_deleted = 0; + table_modify_state = NULL; + insert_modify_buffer_flush_context = NULL; + /*---------- * Resolve the target relation. This is the same as: * @@ -4681,6 +4811,17 @@ ExecEndModifyTable(ModifyTableState *node) { int i; + if (table_modify_state != NULL) + { + Assert(node->operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Allow any FDWs to shut down */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ebde07bcde..11c4d99430 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1226,6 +1226,7 @@ InjectionPointEntry InjectionPointSharedState InlineCodeBlock InProgressIO +InsertModifyBufferFlushContext InsertStmt Instrumentation Int128AggState -- 2.34.1
From 4f4cf2f380a18c7a754b2fcd979af4617c6aff52 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 04:16:07 +0000 Subject: [PATCH v19 4/6] Optimize Logical Replication apply with multi inserts --- src/backend/executor/execReplication.c | 39 +++ src/backend/replication/logical/proto.c | 24 ++ src/backend/replication/logical/worker.c | 357 ++++++++++++++++++++++- src/include/executor/executor.h | 4 + src/include/replication/logicalproto.h | 2 + src/tools/pgindent/typedefs.list | 2 + 6 files changed, 415 insertions(+), 13 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..fae1375537 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, } } +void +ExecRelationMultiInsert(TableModifyState *MultiInsertState, + ResultRelInfo *resultRelInfo, + EState *estate, TupleTableSlot *slot) +{ + bool skip_tuple = false; + Relation rel = resultRelInfo->ri_RelationDesc; + + /* For now we support only tables. */ + Assert(rel->rd_rel->relkind == RELKIND_RELATION); + + CheckCmdReplicaIdentity(rel, CMD_INSERT); + + /* BEFORE ROW INSERT Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row) + { + if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) + skip_tuple = true; /* "do nothing" */ + } + + if (!skip_tuple) + { + /* Compute stored generated columns */ + if (rel->rd_att->constr && + rel->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, slot, + CMD_INSERT); + + /* Check the constraints of the tuple */ + if (rel->rd_att->constr) + ExecConstraints(resultRelInfo, slot, estate); + if (rel->rd_rel->relispartition) + ExecPartitionCheck(resultRelInfo, slot, estate, true); + + table_modify_buffer_insert(MultiInsertState, slot); + } +} + /* * Find the searchslot tuple and update it with data in the slot, * update the indexes, and execute any constraints and per-row triggers. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 95c09c9516..46d38aebd2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_tuple(out, rel, newslot, binary, columns); } +LogicalRepRelId +logicalrep_read_relid(StringInfo in) +{ + LogicalRepRelId relid; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + return relid; +} + +void +logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup) +{ + char action; + + action = pq_getmsgbyte(in); + if (action != 'N') + elog(ERROR, "expected new tuple but got %d", + action); + + logicalrep_read_tuple(in, newtup); +} + /* * Read INSERT from stream. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..3440883847 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -148,7 +148,6 @@ #include <unistd.h> #include "access/table.h" -#include "access/tableam.h" #include "access/twophase.h" #include "access/xact.h" #include "catalog/indexing.h" @@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +typedef enum LRMultiInsertReturnStatus +{ + LR_MULTI_INSERT_NONE = 0, + LR_MULTI_INSERT_REL_SKIPPED, + LR_MULTI_INSERT_DISALLOWED, + LR_MULTI_INSERT_DONE, +} LRMultiInsertReturnStatus; + +static TableModifyState *MultiInsertState = NULL; +static LogicalRepRelMapEntry *LastRel = NULL; +static LogicalRepRelId LastMultiInsertRelId = InvalidOid; +static ApplyExecutionData *LastEData = NULL; +static TupleTableSlot *LastRemoteSlot = NULL; + +typedef struct LRModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; +} LRModifyBufferFlushContext; + +static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL; +static void LRModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots); +static void FinishMultiInserts(void); + /* * Form the origin name for the subscription. * @@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + FinishMultiInserts(); + logicalrep_read_commit(s, &commit_data); if (commit_data.commit_lsn != remote_final_lsn) @@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s) { LogicalRepPreparedTxnData begin_data; + FinishMultiInserts(); + /* Tablesync should never receive prepare. */ if (am_tablesync_worker()) ereport(ERROR, @@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; + FinishMultiInserts(); + logicalrep_read_prepare(s, &prepare_data); if (prepare_data.prepare_lsn != remote_final_lsn) @@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s) LogicalRepCommitPreparedTxnData prepare_data; char gid[GIDSIZE]; + FinishMultiInserts(); + logicalrep_read_commit_prepared(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); @@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s) LogicalRepRollbackPreparedTxnData rollback_data; char gid[GIDSIZE]; + FinishMultiInserts(); + logicalrep_read_rollback_prepared(s, &rollback_data); set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); @@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); + FinishMultiInserts(); + /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s) static void apply_handle_origin(StringInfo s) { + FinishMultiInserts(); + /* * ORIGIN message can only come inside streaming transaction or inside * remote transaction and before any actual writes. @@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + FinishMultiInserts(); + if (!in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s) StringInfoData original_msg = *s; bool toplevel_xact; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, commit_data.commit_lsn); + FinishMultiInserts(); + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ @@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + FinishMultiInserts(); + if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; @@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + FinishMultiInserts(); + if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) return; @@ -2363,16 +2416,132 @@ TargetPrivilegesCheck(Relation rel, AclMode mode) RelationGetRelationName(rel)))); } -/* - * Handle INSERT message. - */ +static void +FinishMultiInserts(void) +{ + LogicalRepMsgType saved_command; + + if (MultiInsertState == NULL) + return; + + Assert(OidIsValid(LastMultiInsertRelId)); + Assert(LastEData != NULL); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = LastRel; + + /* Set current command for error callback */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT; + + ExecDropSingleTupleTableSlot(LastRemoteSlot); + LastRemoteSlot = NULL; + + table_modify_end(MultiInsertState); + MultiInsertState = NULL; + LastMultiInsertRelId = InvalidOid; + + pfree(modify_buffer_flush_context); + modify_buffer_flush_context = NULL; + + ExecCloseIndices(LastEData->targetRelInfo); + + finish_edata(LastEData); + LastEData = NULL; + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; + + logicalrep_rel_close(LastRel, NoLock); + LastRel = NULL; + + end_replication_step(); +} static void -apply_handle_insert(StringInfo s) +LRModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots) +{ + LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + int i; + LogicalRepMsgType saved_command; + + if (nslots <= 0) + return; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Set relation for error callback */ + apply_error_callback_arg.rel = LastRel; + + /* Set current command for error callback */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT; + + /* Caller must take care of opening and closing the indices */ + for (i = 0; i < nslots; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slots[i], estate, false, + false, NULL, NIL, false); + + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + NULL); + + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + NULL); + } + + /* + * XXX we should in theory pass a TransitionCaptureState object to the + * above to capture transition tuples, but after statement triggers + * don't actually get fired by replication yet anyway + */ + } + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; +} + +static LRMultiInsertReturnStatus +do_multi_inserts(StringInfo s, LogicalRepRelId *relid) { LogicalRepRelMapEntry *rel; LogicalRepTupleData newtup; - LogicalRepRelId relid; UserContext ucxt; ApplyExecutionData *edata; EState *estate; @@ -2380,17 +2549,143 @@ apply_handle_insert(StringInfo s) MemoryContext oldctx; bool run_as_owner; + if (MultiInsertState == NULL) + begin_replication_step(); + + *relid = logicalrep_read_relid(s); + + if (MultiInsertState != NULL && + (LastMultiInsertRelId != InvalidOid && + *relid != InvalidOid && + LastMultiInsertRelId != *relid)) + FinishMultiInserts(); + + if (MultiInsertState == NULL) + rel = logicalrep_rel_open(*relid, RowExclusiveLock); + else + rel = LastRel; + + if (!should_apply_changes_for_rel(rel)) + { + Assert(MultiInsertState == NULL); + + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return LR_MULTI_INSERT_REL_SKIPPED; + } + + /* For a partitioned table, let's not do multi inserts. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + Assert(MultiInsertState == NULL); + + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return LR_MULTI_INSERT_DISALLOWED; + } + /* - * Quick return if we are skipping data modification changes or handling - * streamed transactions. + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. */ - if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) - return; + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + + if (MultiInsertState == NULL) + { + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize the executor state. */ + LastEData = edata = create_edata_for_relation(rel); + estate = edata->estate; + + LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel), + &TTSOpsVirtual); + + modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext)); + modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo; + modify_buffer_flush_context->estate = estate; + + MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + 0, + LRModifyBufferFlushCallback, + modify_buffer_flush_context); + LastRel = rel; + LastMultiInsertRelId = *relid; + + /* We must open indexes here. */ + ExecOpenIndices(edata->targetRelInfo, false); + + MemoryContextSwitchTo(oldctx); + } + else + { + CommandId cid; + + edata = LastEData; + estate = edata->estate; + ResetExprContext(GetPerTupleExprContext(estate)); + ExecClearTuple(LastRemoteSlot); + remoteslot = LastRemoteSlot; + cid = GetCurrentCommandId(true); + MultiInsertState->cid = cid; + estate->es_output_cid = cid; + } + + /* Process and store remote tuple in the slot */ + logicalrep_read_insert_v2(s, &newtup); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, rel, &newtup); + slot_fill_defaults(rel, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT); + ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot); + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + Assert(MultiInsertState != NULL); + + CommandCounterIncrement(); + + return LR_MULTI_INSERT_DONE; +} + +static bool +do_single_inserts(StringInfo s, LogicalRepRelId relid) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData newtup; + UserContext ucxt; + ApplyExecutionData *edata; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; + bool run_as_owner; + + Assert(relid != InvalidOid); begin_replication_step(); - relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { @@ -2400,7 +2695,7 @@ apply_handle_insert(StringInfo s) */ logicalrep_rel_close(rel, RowExclusiveLock); end_replication_step(); - return; + return false; } /* @@ -2422,6 +2717,7 @@ apply_handle_insert(StringInfo s) &TTSOpsVirtual); /* Process and store remote tuple in the slot */ + logicalrep_read_insert_v2(s, &newtup); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); @@ -2446,6 +2742,35 @@ apply_handle_insert(StringInfo s) logicalrep_rel_close(rel, NoLock); end_replication_step(); + + return true; +} + +/* + * Handle INSERT message. + */ +static void +apply_handle_insert(StringInfo s) +{ + LRMultiInsertReturnStatus mi_status; + LogicalRepRelId relid; + + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + return; + + mi_status = do_multi_inserts(s, &relid); + if (mi_status == LR_MULTI_INSERT_REL_SKIPPED || + mi_status == LR_MULTI_INSERT_DONE) + return; + + do_single_inserts(s, relid); + + return; } /* @@ -2532,6 +2857,8 @@ apply_handle_update(StringInfo s) MemoryContext oldctx; bool run_as_owner; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. @@ -2713,6 +3040,8 @@ apply_handle_delete(StringInfo s) MemoryContext oldctx; bool run_as_owner; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. @@ -3154,6 +3483,8 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9770752ea3..8f10ea977b 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "access/tableam.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); +extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState, + ResultRelInfo *resultRelInfo, + EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2e..3f3a7f0a31 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns); +extern LogicalRepRelId logicalrep_read_relid(StringInfo in); +extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 11c4d99430..70f23808e2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1456,6 +1456,8 @@ LPTHREAD_START_ROUTINE LPTSTR LPVOID LPWSTR +LRModifyBufferFlushContext +LRMultiInsertReturnStatus LSEG LUID LVRelState -- 2.34.1
From d83bf5bc0bfc5f45d85e38a876a7db94f12803da Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 05:18:58 +0000 Subject: [PATCH v19 5/6] Use new multi insert Table AM for COPY FROM --- src/backend/commands/copyfrom.c | 230 +++++++++++++++-------- src/include/commands/copyfrom_internal.h | 4 +- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 153 insertions(+), 82 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index ce4d62e707..8572c5b730 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -71,13 +71,21 @@ /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +typedef struct CopyModifyBufferFlushContext +{ + CopyFromState cstate; + ResultRelInfo *resultRelInfo; + EState *estate; +} CopyModifyBufferFlushContext; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableModifyState *mstate; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ + TupleTableSlot *multislot; + CopyModifyBufferFlushContext *modify_buffer_flush_context; int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -99,6 +107,7 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; +static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots); /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); @@ -218,14 +227,38 @@ CopyLimitPrintoutLength(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + CopyFromState cstate, EState *estate) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext)); + buffer->modify_buffer_flush_context->cstate = cstate; + buffer->modify_buffer_flush_context->resultRelInfo = rri; + buffer->modify_buffer_flush_context->estate = estate; + + buffer->mstate = table_modify_begin(rri->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + miinfo->mycid, + miinfo->ti_options, + CopyModifyBufferFlushCallback, + buffer->modify_buffer_flush_context); + buffer->slots = NULL; + buffer->multislot = NULL; + } + else + { + buffer->mstate = NULL; + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->multislot = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -236,11 +269,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) */ static inline void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) + ResultRelInfo *rri, CopyFromState cstate, + EState *estate) { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -273,7 +307,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * tuples their way for the first time. */ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); + CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate); } /* @@ -317,8 +351,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -390,13 +422,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -404,56 +431,12 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; + table_modify_buffer_flush(buffer->mstate); + /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. + * Indexes are updated and AFTER ROW INSERT triggers (if any) are run + * in the flush callback CopyModifyBufferFlushCallback. */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted - * tuples, and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, - false, NULL, NIL, false); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } - - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, - cstate->transition_capture); - } - - ExecClearTuple(slots[i]); - } /* Update the row counter and progress of the COPY command */ *processed += nused; @@ -469,6 +452,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, buffer->nused = 0; } +static void +CopyModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots) +{ + CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context; + CopyFromState cstate = ctx->cstate; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer; + int i; + + if (nslots <= 0) + return; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Caller must take care of opening and closing the indices */ + for (i = 0; i < nslots; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slots[i], estate, false, + false, NULL, NIL, false); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + cstate->transition_capture); + } + } +} + /* * Drop used slots and free member for this buffer. * @@ -489,19 +530,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, if (resultRelInfo->ri_FdwRoutine == NULL) { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); + table_modify_end(buffer->mstate); + ExecDropSingleTupleTableSlot(buffer->multislot); + pfree(buffer->modify_buffer_flush_context); } else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -588,13 +628,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + { + if (buffer->multislot == NULL) + buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc), + &TTSOpsVirtual); + + /* Caller must clear the slot */ + slot = buffer->multislot; + } + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -608,7 +667,17 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); + + if (rri->ri_FdwRoutine == NULL) + { + Assert(slot == buffer->multislot); + table_modify_buffer_insert(buffer->mstate, slot); + } + +#ifdef USE_ASSERT_CHECKING + if (rri->ri_FdwRoutine != NULL) + Assert(slot == buffer->slots[buffer->nused]); +#endif /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; @@ -830,7 +899,7 @@ CopyFrom(CopyFromState cstate) /* * It's generally more efficient to prepare a bunch of tuples for * insertion, and insert them in one - * table_multi_insert()/ExecForeignBatchInsert() call, than call + * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call * table_tuple_insert()/ExecForeignInsert() separately for every tuple. * However, there are a number of reasons why we might not be able to do * this. These are explained below. @@ -1080,7 +1149,8 @@ CopyFrom(CopyFromState cstate) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); + resultRelInfo, cstate, + estate); } else if (insertMethod == CIM_MULTI_CONDITIONAL && !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78..14addbc6f6 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -46,9 +46,9 @@ typedef enum EolType typedef enum CopyInsertMethod { CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or + CIM_MULTI, /* always use table_modify_buffer_insert or * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 70f23808e2..bd8c87be33 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -486,6 +486,7 @@ CopyHeaderChoice CopyInsertMethod CopyMethod CopyLogVerbosityChoice +CopyModifyBufferFlushContext CopyMultiInsertBuffer CopyMultiInsertInfo CopyOnErrorChoice -- 2.34.1
From 1b5d3c04f21e764756f89f0456d0f96e2b2350de Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 23 Apr 2024 05:25:21 +0000 Subject: [PATCH v19 6/6] Remove table_multi_insert and table_finish_bulk_insert --- src/backend/access/heap/heapam_handler.c | 1 - src/backend/access/table/tableamapi.c | 1 - src/backend/commands/tablecmds.c | 4 -- src/include/access/tableam.h | 56 +----------------------- 4 files changed, 1 insertion(+), 61 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index eda0c73a16..fe9701773a 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2614,7 +2614,6 @@ static const TableAmRoutine heapam_methods = { .tuple_insert = heapam_tuple_insert, .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, - .multi_insert = heap_multi_insert, .tuple_modify_begin = heap_modify_begin, .tuple_modify_buffer_insert = heap_modify_buffer_insert, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 96ac951af6..0af8f1ac1f 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -71,7 +71,6 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->tuple_insert_speculative != NULL); Assert(routine->tuple_complete_speculative != NULL); - Assert(routine->multi_insert != NULL); Assert(routine->tuple_delete != NULL); Assert(routine->tuple_update != NULL); Assert(routine->tuple_lock != NULL); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 0c984aa656..22bcb12abb 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -20969,8 +20969,6 @@ deleteSplitPartitionContext(SplitPartitionContext *pc, int ti_options) ExecDropSingleTupleTableSlot(pc->dstslot); FreeBulkInsertState(pc->bistate); - table_finish_bulk_insert(pc->partRel, ti_options); - pfree(pc); } @@ -21453,8 +21451,6 @@ moveMergedTablesRows(Relation rel, List *mergingPartitionsList, ExecDropSingleTupleTableSlot(dstslot); FreeBulkInsertState(bistate); - - table_finish_bulk_insert(newPartRel, ti_options); } /* diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index ddb6e6f3a5..82798fd641 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -563,10 +563,6 @@ typedef struct TableAmRoutine uint32 specToken, bool succeeded); - /* see table_multi_insert() for reference about parameters */ - void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, - CommandId cid, int options, struct BulkInsertStateData *bistate); - /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -600,21 +596,6 @@ typedef struct TableAmRoutine uint8 flags, TM_FailureData *tmfd); - /* - * Perform operations necessary to complete insertions made via - * tuple_insert and multi_insert with a BulkInsertState specified. In-tree - * access methods ceased to use this. - * - * Typically callers of tuple_insert and multi_insert will just pass all - * the flags that apply to them, and each AM has to decide which of them - * make sense for it, and then only take actions in finish_bulk_insert for - * those flags, and ignore others. - * - * Optional callback. - */ - void (*finish_bulk_insert) (Relation rel, int options); - - /* ------------------------------------------------------------------------ * Table Modify related functions. * ------------------------------------------------------------------------ @@ -1453,8 +1434,7 @@ table_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate) * heap's TOAST table, too, if the tuple requires any out-of-line data. * * The BulkInsertState object (if any; bistate can be NULL for default - * behavior) is also just passed through to RelationGetBufferForTuple. If - * `bistate` is provided, table_finish_bulk_insert() needs to be called. + * behavior) is also just passed through to RelationGetBufferForTuple. * * On return the slot's tts_tid and tts_tableOid are updated to reflect the * insertion. But note that any toasting of fields within the slot is NOT @@ -1501,28 +1481,6 @@ table_tuple_complete_speculative(Relation rel, TupleTableSlot *slot, succeeded); } -/* - * Insert multiple tuples into a table. - * - * This is like table_tuple_insert(), but inserts multiple tuples in one - * operation. That's often faster than calling table_tuple_insert() in a loop, - * because e.g. the AM can reduce WAL logging and page locking overhead. - * - * Except for taking `nslots` tuples as input, and an array of TupleTableSlots - * in `slots`, the parameters for table_multi_insert() are the same as for - * table_tuple_insert(). - * - * Note: this leaks memory into the current memory context. You can create a - * temporary context before calling this, if that's a problem. - */ -static inline void -table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, - CommandId cid, int options, struct BulkInsertStateData *bistate) -{ - rel->rd_tableam->multi_insert(rel, slots, nslots, - cid, options, bistate); -} - /* * Delete a tuple. * @@ -1649,18 +1607,6 @@ table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, flags, tmfd); } -/* - * Perform operations necessary to complete insertions made via - * tuple_insert and multi_insert with a BulkInsertState specified. - */ -static inline void -table_finish_bulk_insert(Relation rel, int options) -{ - /* optional callback */ - if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert) - rel->rd_tableam->finish_bulk_insert(rel, options); -} - /* ------------------------------------------------------------------------ * Table Modify related functions. * ------------------------------------------------------------------------ -- 2.34.1