On Wed, Mar 27, 2024 at 1:42 PM Jeff Davis <pg...@j-davis.com> wrote: > > On Wed, 2024-03-27 at 01:19 +0530, Bharath Rupireddy wrote: > > > > Similarly, with this new AM, the onus lies on the table AM > > implementers to provide an implementation for these new AMs even if > > they just do single inserts. > > Why not fall back to using the plain tuple_insert? Surely some table > AMs might be simple and limited, and we shouldn't break them just > because they don't implement the new APIs.
Hm. That might complicate table_modify_begin, table_modify_buffer_insert and table_modify_end a bit. What do we put in TableModifyState then? Do we create the bulk insert state (BulkInsertStateData) outside? I think to give a better interface, can we let TAM implementers support these new APIs in their own way? If this sounds rather intrusive, we can just implement the fallback to tuple_insert if these new API are not supported in the caller, for example, do something like below in createas.c and matview.c. Thoughts? if (table_modify_buffer_insert() is defined) table_modify_buffer_insert(...); else { myState->bistate = GetBulkInsertState(); table_tuple_insert(...); } > > table_multi_insert needs to be there for sure as COPY ... FROM uses > > it. > > After we have these new APIs fully in place and used by COPY, what will > happen to those other APIs? Will they be deprecated or will there be a > reason to keep them? Deprecated perhaps? Please find the attached v16 patches for further review. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 85410b429917cf388c4b58883ddc304118c73143 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sun, 31 Mar 2024 15:34:16 +0000 Subject: [PATCH v16 1/2] Introduce new table modify access methods --- src/backend/access/heap/heapam.c | 189 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableamapi.c | 4 + src/include/access/heapam.h | 41 +++++ src/include/access/tableam.h | 58 +++++++ src/tools/pgindent/typedefs.list | 3 + 6 files changed, 299 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b661d9811e..69f8c597d8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -107,7 +108,8 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); - +static void heap_modify_buffer_flush(TableModifyState *state); +static void heap_modify_insert_end(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2441,6 +2443,191 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, *insert_indexes = true; } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, int modify_flags, CommandId cid, + int options) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mctx = context; + state->cid = cid; + state->options = options; + state->insert_indexes = false; + state->modify_end_cb = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mctx); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + + if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0) + { + mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + istate->mistate = mistate; + } + + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + + state->modify_end_cb = heap_modify_insert_end; + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + dstslot = mistate->slots[mistate->cur_slots]; + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + /* + * Memory allocated for the whole tuple is in slot's memory context, so + * use it keep track of the total space occupied by all buffered tuples. + */ + if (TTS_SHOULDFREE(dstslot)) + mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +static void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + if (mistate->cur_slots == 0) + return; + + oldcontext = MemoryContextSwitchTo(state->mctx); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->options, istate->bistate, + &state->insert_indexes); + + mistate->cur_slots = 0; + mistate->cur_size = 0; + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Heap insert specific callback used for performing work at the end like + * flushing buffered tuples if any, cleaning up the insert state and buffered + * slots. + */ +static void +heap_modify_insert_end(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->cur_slots == 0 && + mistate->cur_size == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + if (state->modify_end_cb != NULL) + state->modify_end_cb(state); + + MemoryContextDelete(state->mctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 41a4bb0981..3d38da635d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2649,6 +2649,11 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index d9e23ef317..80d923bbdc 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -66,6 +66,10 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->tuple_insert != NULL); + Assert(routine->tuple_modify_begin != NULL); + Assert(routine->tuple_modify_buffer_insert != NULL); + Assert(routine->tuple_modify_end != NULL); + /* * Could be made optional, but would require throwing error during * parse-analysis. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 32a3fbce96..4adfc1fb35 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -233,6 +233,36 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Approximate size of all tuples currently held in buffered slots */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -283,6 +313,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate, bool *insert_indexes); + +extern TableModifyState *heap_modify_begin(Relation rel, + int modify_flags, + CommandId cid, + int options); + +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); + +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, int options, struct TM_FailureData *tmfd, bool changingPart, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index cf76fc29d4..de50f51078 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -248,6 +248,35 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Table modify flags */ + +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TM_FLAG_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TM_FLAG_BAS_BULKWRITE 0x000002 + +struct TableModifyState; + +/* Table AM specific callback that gets called in table_modify_end() */ +typedef void (*TableModifyEndCP) (struct TableModifyState *state); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + Relation rel; + int modify_flags; + MemoryContext mctx; + CommandId cid; + int options; + bool insert_indexes; + + /* Table AM specific data starts here */ + void *data; + + TableModifyEndCP modify_end_cb; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -534,6 +563,16 @@ typedef struct TableAmRoutine CommandId cid, int options, struct BulkInsertStateData *bistate, bool *insert_indexes); + TableModifyState *(*tuple_modify_begin) (Relation rel, + int modify_flags, + CommandId cid, + int options); + + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + + void (*tuple_modify_end) (TableModifyState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1464,6 +1503,25 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate, insert_indexes); } +static inline TableModifyState * +table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options) +{ + return rel->rd_tableam->tuple_modify_begin(rel, modify_flags, + cid, options); +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_end(state); +} + /* * Delete a tuple (and optionally lock the last tuple version). * diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a8d7bed411..f77c322709 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1122,6 +1122,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2809,6 +2811,7 @@ TableFuncScan TableFuncScanState TableInfo TableLikeClause +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.34.1
From 32050367825d5f8dbb1330cc4f8ef7818eb544ed Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sun, 31 Mar 2024 15:34:57 +0000 Subject: [PATCH v16 2/2] Optimize CTAS, CMV, RMV with multi inserts --- src/backend/commands/createas.c | 27 +++++++++------------------ src/backend/commands/matview.c | 26 +++++++++----------------- 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index afd3dace07..00c1271f93 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -53,9 +53,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(intoRelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->mstate = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -578,7 +578,6 @@ static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_intorel *myState = (DR_intorel *) self; - bool insertIndexes; /* Nothing to insert if WITH NO DATA is specified. */ if (!myState->into->skipData) @@ -591,12 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate, - &insertIndexes); + table_modify_buffer_insert(myState->mstate, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -614,10 +608,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_modify_end(myState->mstate); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 9ec13d0984..f03aa1cff3 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -48,9 +48,7 @@ typedef struct Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -458,9 +456,12 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * Fill private fields of myState for use by later routines */ myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(transientrel, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -476,7 +477,6 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; - bool insertIndexes; /* * Note that the input slot might not be of the type of the target @@ -486,13 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate, - &insertIndexes); + table_modify_buffer_insert(myState->mstate, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,9 +501,7 @@ transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_modify_end(myState->mstate); /* close transientrel, but keep lock until commit */ table_close(myState->transientrel, NoLock); -- 2.34.1