Here is a new version of the patch, which:
1. Fixes some minor stylistic issues.
2. Uses binaryheap (instead of a custom ugly stack) for merging.
Regards,
Constantin S. Pan
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d48a13f..c9ff58d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6844,6 +6844,35 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
</listitem>
</varlistentry>
+ <varlistentry id="guc-gin-parallel-workers" xreflabel="gin_parallel_workers">
+ <term><varname>gin_parallel_workers</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>gin_parallel_workers</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Upper limit of the number of parallel workers that should be used when
+ building GIN. Set to 0 to disable parallel GIN build.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-gin-shared-mem" xreflabel="gin_shared_mem">
+ <term><varname>gin_shared_mem</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>gin_parallel_workers</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ The size of shared memory segment for parallel GIN buiding. The segment
+ is used to transfer partial results from workers for final merging.
+ Ignored if not using parallel GIN build.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect2>
</sect1>
diff --git a/src/backend/access/gin/README b/src/backend/access/gin/README
index fade0cb..4325d88 100644
--- a/src/backend/access/gin/README
+++ b/src/backend/access/gin/README
@@ -49,7 +49,7 @@ Features
* Write-Ahead Logging (WAL). (Recoverability from crashes.)
* User-defined opclasses. (The scheme is similar to GiST.)
* Optimized index creation (Makes use of maintenance_work_mem to accumulate
- postings in memory.)
+ postings in memory and gin_parallel_workers to speed the process up.)
* Text search support via an opclass
* Soft upper limit on the returned results set using a GUC variable:
gin_fuzzy_search_limit
@@ -324,6 +324,24 @@ page-deletions safe; it stamps the deleted pages with an XID and keeps the
deleted pages around with the right-link intact until all concurrent scans
have finished.)
+Parallel Building
+-----------------
+
+The most expensive part of GIN index building is accumulating the rbtree. GIN
+supports using parallel workers which divide the work between each other. This
+speeds up the accumulating stage almost perfectly, but slows down the dumping
+of the result a little, since the backend now needs to merge lists that
+correspond to the same key but come from multiple workers.
+
+When it is time to build a GIN index on a relation, the backend checks the value of
+gin_parallel_workers config variable and tries to launch the corresponding
+number of parallel workers. The backend also sets up a shared memory segment of
+configurable size (gin_shared_mem). The workers scan the relation, dividing it
+by blocks, until they fill the maintenance_work_mem, then send the accumulated
+tree to the backend through the shared memory segment. The backend merges the
+trees and inserts the entries to the index. The process repeats until the
+relation is fully scanned.
+
Compatibility
-------------
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..d5cbf6c 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,26 +16,95 @@
#include "access/gin_private.h"
#include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
#include "utils/memutils.h"
#include "utils/rel.h"
+#include "lib/binaryheap.h"
+/* GUC parameter */
+int gin_parallel_workers = 0;
+int gin_shared_mem = 0;
-typedef struct
+#define KEY_TASK 42
+#define KEY_SHM_ORIGIN 43
+#define KEY_SHM_PER_WORKER 44
+#define GIN_BLOCKS_PER_WORKER 4
+
+/*
+ * The results are passed through the shared message queue as a sequence of
+ * messages of the following structure: GinShmemEntry, Key, List
+ */
+typedef struct GinShmemEntry
+{
+ GinNullCategory category;
+ OffsetNumber attnum;
+ int nlist;
+} GinShmemEntry;
+
+/*
+ * This structure is used by the backend to track the current state of the
+ * workers.
+ */
+typedef struct WorkerResult
+{
+ void *mq;
+ shm_mq_handle *mqh;
+ bool end_of_tree;
+ bool end_of_forest;
+} WorkerResult;
+
+typedef struct GinEntry
+{
+ int worker;
+ Datum key;
+ GinNullCategory category;
+ OffsetNumber attnum;
+ ItemPointerData *list;
+ uint32 nlist;
+} GinEntry;
+
+/*
+ * This shared structure describes the GIN build task for the parallel workers.
+ * We use OIDs here because workers are separate processes and pointers may
+ * become meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct GinBuildTask
+{
+ int to_scan;
+ int scanned;
+ slock_t lock;
+ Oid heap_oid;
+ Oid index_oid;
+ double reltuples;
+} GinBuildTask;
+
+typedef struct GinBuildState
{
- GinState ginstate;
- double indtuples;
- GinStatsData buildStats;
- MemoryContext tmpCtx;
- MemoryContext funcCtx;
+ GinState ginstate;
+ double indtuples;
+ GinStatsData buildStats;
+ MemoryContext tmpCtx;
+ MemoryContext funcCtx;
BuildAccumulator accum;
+
+ ParallelContext *pcxt;
+ volatile GinBuildTask *task;
+ WorkerResult *results; /* NULL in workers, array in backend */
+
+ /* these only get used by workers */
+ shm_mq *mq;
+ shm_mq_handle *mqh;
} GinBuildState;
-
/*
* Adds array of item pointers to tuple's posting list, or
* creates posting tree and tuple pointing to tree in case
@@ -265,6 +334,334 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
MemoryContextReset(buildstate->funcCtx);
}
+/*
+ * Dump one GIN entry from the worker to the backend (through the shared
+ * message queue).
+ */
+static void
+ginDumpEntry(GinState *ginstate,
+ shm_mq_handle *mqh,
+ OffsetNumber attnum,
+ Datum key,
+ GinNullCategory category,
+ ItemPointerData *list,
+ int nlist)
+{
+ int keylen, listlen;
+
+ bool isnull;
+ Form_pg_attribute att;
+ GinShmemEntry e;
+
+ /*
+ * The message consists of 2 or 3 parts. The IO vector allows us to send
+ * them as one message though the parts are located at unrelated addresses.
+ */
+ shm_mq_iovec iov[3];
+ int iovlen = 0;
+
+ char *buf = NULL;
+
+ e.category = category;
+ e.attnum = attnum;
+ e.nlist = nlist;
+
+ Assert(nlist > 0);
+
+ isnull = category == GIN_CAT_NULL_KEY;
+ att = ginstate->origTupdesc->attrs[attnum - 1];
+
+ if (e.category == GIN_CAT_NORM_KEY)
+ {
+ keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+ Assert(keylen > 0);
+ listlen = e.nlist * sizeof(ItemPointerData);
+ }
+ else
+ {
+ keylen = 0;
+ }
+
+ listlen = e.nlist * sizeof(ItemPointerData);
+
+ iov[iovlen].data = (char *)&e;
+ iov[iovlen++].len = sizeof(e);
+
+ if (keylen > 0)
+ {
+ char *cursor;
+ buf = palloc(keylen);
+ cursor = buf;
+ datumSerialize(key, isnull, att->attbyval, att->attlen, &cursor);
+ iov[iovlen].data = buf;
+ iov[iovlen++].len = keylen;
+ }
+
+ iov[iovlen].data = (char *)list;
+ iov[iovlen++].len = listlen;
+
+ if (shm_mq_sendv(mqh, iov, iovlen, false) != SHM_MQ_SUCCESS)
+ elog(ERROR,
+ "worker %d failed to send a result entry to the backend",
+ ParallelWorkerNumber);
+
+ if (buf)
+ pfree(buf);
+}
+
+/*
+ * Send the accumulated tree from the worker to the backend (through the shared
+ * message queue).
+ */
+static void
+ginSendTree(GinBuildState *buildstate)
+{
+ ItemPointerData *list;
+ Datum key;
+ GinNullCategory category;
+ uint32 nlist;
+ OffsetNumber attnum;
+ MemoryContext oldCtx;
+
+ Assert(IsParallelWorker());
+ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+ ginBeginBAScan(&buildstate->accum);
+ while ((list = ginGetBAEntry(&buildstate->accum,
+ &attnum, &key, &category, &nlist)) != NULL)
+ {
+ /* there could be many entries, so be willing to abort here */
+ CHECK_FOR_INTERRUPTS();
+
+ ginDumpEntry(&buildstate->ginstate,
+ buildstate->mqh, attnum, key,
+ category, list, nlist);
+ }
+
+ MemoryContextReset(buildstate->tmpCtx);
+ ginInitBA(&buildstate->accum);
+
+ /* send an empty message as an "end-of-tree" marker */
+ if (shm_mq_send(buildstate->mqh, 0, NULL, false) != SHM_MQ_SUCCESS)
+ {
+ elog(ERROR,
+ "worker %d failed to send the end-of-tree marker to the backend",
+ ParallelWorkerNumber);
+ }
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
+/*
+ * Resets the 'end-of-tree' marker for all workers.
+ */
+static void
+resetEndOfTree(GinBuildState *buildstate)
+{
+ int i;
+ int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0;
+ for (i = 0; i < wnum; ++i)
+ {
+ WorkerResult *r = buildstate->results + i;
+ r->end_of_tree = false;
+ }
+}
+
+/*
+ * Get the next entry from the message queue (or from the rbtree directly, if
+ * backend, i.e. worker < 0).
+ */
+static GinEntry *
+getNextEntry(GinBuildState *buildstate, int worker)
+{
+ GinEntry e, *ecopy;
+ e.worker = worker;
+
+ if (worker < 0)
+ {
+ e.list = ginGetBAEntry(&buildstate->accum,
+ &e.attnum,
+ &e.key,
+ &e.category,
+ &e.nlist);
+ if (!e.list)
+ return NULL;
+ }
+ else
+ {
+ GinShmemEntry *shentry;
+ char *cursor;
+ Size msglen;
+ void *msg;
+ WorkerResult *r = buildstate->results + worker;
+
+ if (r->end_of_forest || r->end_of_tree)
+ return NULL;
+
+ if (shm_mq_receive(r->mqh, &msglen, &msg, false) != SHM_MQ_SUCCESS)
+ {
+ r->end_of_forest = true;
+ return NULL;
+ }
+
+ if (msglen == 0) /* end-of-tree */
+ {
+ r->end_of_tree = true;
+ return NULL;
+ }
+
+ cursor = msg;
+ Assert(cursor != NULL);
+ shentry = (GinShmemEntry*)cursor;
+ cursor += sizeof(shentry);
+
+ e.key = 0;
+ if (shentry->category == GIN_CAT_NORM_KEY)
+ {
+ bool isnull;
+ e.key = datumRestore(&cursor, &isnull);
+ Assert(!isnull);
+ }
+ e.nlist = shentry->nlist;
+ e.list = palloc(sizeof(ItemPointerData) * e.nlist);
+ memcpy(e.list, cursor, sizeof(ItemPointerData) * e.nlist);
+ e.attnum = shentry->attnum;
+ e.category = shentry->category;
+ }
+
+ ecopy = palloc(sizeof(e));
+ memcpy(ecopy, &e, sizeof(e));
+ return ecopy;
+}
+
+static int
+compare(GinBuildState *buildstate, GinEntry *a, GinEntry *b)
+{
+ return ginCompareAttEntries(&buildstate->ginstate,
+ a->attnum, a->key, a->category,
+ b->attnum, b->key, b->category);
+}
+
+static int binheap_compare(Datum a, Datum b, void *arg)
+{
+ return -compare((GinBuildState *)arg, (GinEntry *)a, (GinEntry *)b);
+}
+
+/*
+ * Merge the trees from all ready (but unfinished) workers (and from myself).
+ * Return the number of entries inserted.
+ */
+static int
+mergeTrees(GinBuildState *buildstate)
+{
+ GinEntry *minentry = NULL;
+ binaryheap *binheap = NULL;
+ int i;
+ int inserted = 0;
+ int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0;
+
+ ginBeginBAScan(&buildstate->accum);
+
+ binheap = binaryheap_allocate(wnum + 1, binheap_compare, buildstate);
+
+ /* Populate the binheap */
+ for (i = -1; i < wnum; i++)
+ {
+ GinEntry *e = getNextEntry(buildstate, i);
+ if (e)
+ binaryheap_add(binheap, PointerGetDatum(e));
+ }
+
+ while (!binaryheap_empty(binheap))
+ {
+ GinEntry *candidate = (GinEntry *)DatumGetPointer(binaryheap_remove_first(binheap));
+
+ {
+ /* refill from the same message queue */
+ GinEntry *e;
+ e = getNextEntry(buildstate, candidate->worker);
+ if (e)
+ binaryheap_add(binheap, PointerGetDatum(e));
+ }
+
+ if (minentry)
+ {
+ int cmp = compare(buildstate, candidate, minentry);
+ if (cmp)
+ {
+ /* Merge finished, insert the entry into the index. */
+ Assert(cmp > 0);
+ ginEntryInsert(&buildstate->ginstate,
+ minentry->attnum, minentry->key, minentry->category,
+ minentry->list, minentry->nlist,
+ &buildstate->buildStats);
+ inserted++;
+ pfree(minentry->list);
+ pfree(minentry);
+ minentry = candidate;
+ } else {
+ /* Merge the candidate with minentry. */
+ int newnlist;
+
+ ItemPointerData *oldlist = minentry->list;
+ minentry->list = ginMergeItemPointers(minentry->list, minentry->nlist,
+ candidate->list, candidate->nlist,
+ &newnlist);
+ minentry->nlist = newnlist;
+ pfree(candidate->list);
+ pfree(candidate);
+ pfree(oldlist);
+ }
+ }
+ else
+ {
+ minentry = candidate;
+ }
+
+ if (minentry)
+ {
+ ginEntryInsert(&buildstate->ginstate,
+ minentry->attnum, minentry->key, minentry->category,
+ minentry->list, minentry->nlist,
+ &buildstate->buildStats);
+ inserted++;
+ }
+ }
+
+ Assert(binaryheap_empty(binheap));
+
+ binaryheap_free(binheap);
+ ginInitBA(&buildstate->accum);
+ return inserted;
+}
+
+/*
+ * The common function used by both backend and worker to dump the accumulator
+ * when it gets full. 'last' should be true on the last call, to make the
+ * backend merge everything from the workers even if the backend has no more
+ * results from itself.
+ */
+static void
+ginDumpCommon(GinBuildState *buildstate, bool last)
+{
+ if (IsParallelWorker())
+ ginSendTree(buildstate);
+ else
+ {
+ int inserted;
+ do
+ {
+ resetEndOfTree(buildstate);
+ inserted = mergeTrees(buildstate);
+ } while (last && (inserted > 0));
+ /*
+ * If it is not the 'last' dump, then the backend can merge the next
+ * incoming trees on the next dump. But if it is the 'last' dump, the
+ * backend has to repeat merging until all workers are finished.
+ */
+ }
+}
+
static void
ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
@@ -282,53 +679,244 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
/* If we've maxed out our available memory, dump everything to the index */
if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
- {
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- OffsetNumber attnum;
-
- ginBeginBAScan(&buildstate->accum);
- while ((list = ginGetBAEntry(&buildstate->accum,
- &attnum, &key, &category, &nlist)) != NULL)
- {
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate->ginstate, attnum, key, category,
- list, nlist, &buildstate->buildStats);
- }
-
- MemoryContextReset(buildstate->tmpCtx);
- ginInitBA(&buildstate->accum);
- }
+ ginDumpCommon(buildstate, false);
MemoryContextSwitchTo(oldCtx);
}
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int
+claimSomeBlocks(volatile GinBuildTask *task, int max_blocks, int *first)
+{
+ int blocks = 0;
+
+ SpinLockAcquire(&task->lock);
+
+ if (task->scanned >= task->to_scan)
+ {
+ SpinLockRelease(&task->lock);
+ return 0;
+ }
+
+ *first = task->scanned;
+ blocks = max_blocks;
+ if (blocks > task->to_scan - task->scanned)
+ blocks = task->to_scan - task->scanned;
+ task->scanned += blocks;
+
+ SpinLockRelease(&task->lock);
+ return blocks;
+}
+
+static void
+reportReltuples(volatile GinBuildTask *task, double reltuples)
+{
+ SpinLockAcquire(&task->lock);
+ task->reltuples += reltuples;
+ SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ double reltuples = 0;
+
+ /*
+ * create a temporary memory context that is used to hold data not yet
+ * dumped out to the index
+ */
+ buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * create a temporary memory context that is used for calling
+ * ginExtractEntries(), and can be reset after each tuple
+ */
+ buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context for user-defined function",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ buildstate->accum.ginstate = &buildstate->ginstate;
+ ginInitBA(&buildstate->accum);
+
+ /*
+ * Do the heap scan. We disallow sync scan here because dataPlaceToPage
+ * prefers to receive tuples in TID order.
+ */
+ while (true)
+ {
+ double subtuples;
+ int first, blocks;
+
+ blocks = claimSomeBlocks(buildstate->task, GIN_BLOCKS_PER_WORKER, &first);
+ if (blocks == 0)
+ break;
+
+ subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+ first, blocks,
+ ginBuildCallback, (void *)buildstate);
+ reltuples += subtuples;
+ }
+
+ /* dump remaining entries to the index */
+ ginDumpCommon(buildstate, true);
+
+ MemoryContextDelete(buildstate->funcCtx);
+ MemoryContextDelete(buildstate->tmpCtx);
+
+ /*
+ * Update metapage stats
+ */
+ buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+ ginUpdateStats(index, &buildstate->buildStats);
+
+ return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void
+ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+ GinBuildState buildstate;
+
+ Relation heap;
+ Relation index;
+ IndexInfo *indexInfo;
+
+ double reltuples;
+
+ char *shm_origin;
+ int mqsize;
+
+ buildstate.task = (GinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+
+ shm_origin = (char *)shm_toc_lookup(toc, KEY_SHM_ORIGIN);
+ mqsize = *(int*)shm_toc_lookup(toc, KEY_SHM_PER_WORKER);
+ buildstate.mq = (shm_mq *)(shm_origin + ParallelWorkerNumber * mqsize);
+ shm_mq_set_sender(buildstate.mq, MyProc);
+ buildstate.mqh = shm_mq_attach(buildstate.mq, seg, NULL);
+ shm_mq_wait_for_attach(buildstate.mqh);
+
+ /*
+ * NoLock here because the backend has already opened the heap and the
+ * index. We reopen them in the worker, because we cannot just pass the
+ * Relation pointers through the shared memory, thus we rely on OIDs.
+ */
+ heap = heap_open(buildstate.task->heap_oid, NoLock);
+ index = index_open(buildstate.task->index_oid, NoLock);
+ indexInfo = BuildIndexInfo(index);
+
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+ reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+ index_close(index, NoLock);
+ heap_close(heap, NoLock);
+
+ reportReltuples(buildstate.task, reltuples);
+
+ shm_mq_detach(buildstate.mq);
+}
+
+/* Initialize the parallel gin building task in the shared memory. */
+static void
+initTask(volatile GinBuildTask *task, Relation heap, Relation index)
+{
+ task->to_scan = RelationGetNumberOfBlocks(heap);
+ task->scanned = 0;
+ SpinLockInit(&task->lock);
+ task->heap_oid = RelationGetRelid(heap);
+ task->index_oid = RelationGetRelid(index);
+ task->reltuples = 0;
+}
+
+/* Launch 'wnum' parallel workers to build 'index' on 'heap'. */
+static void
+launchWorkers(GinBuildState *buildstate, int wnum, Relation heap, Relation index)
+{
+ int i;
+ void *origin;
+ int *mqsize;
+
+ EnterParallelMode();
+ buildstate->pcxt = CreateParallelContext(ginbuildWorker, wnum);
+ {
+ int size = 0, keys = 0;
+ keys++; size += sizeof(GinBuildTask);
+ keys++; size += gin_shared_mem * 1024;
+ keys++; size += sizeof(int); /* for mqsize */
+
+ shm_toc_estimate_chunk(&buildstate->pcxt->estimator, size);
+ shm_toc_estimate_keys(&buildstate->pcxt->estimator, keys);
+ }
+ InitializeParallelDSM(buildstate->pcxt);
+
+ buildstate->task = (GinBuildTask*)shm_toc_allocate(buildstate->pcxt->toc, sizeof(GinBuildTask));
+ shm_toc_insert(buildstate->pcxt->toc, KEY_TASK, (GinBuildTask*)buildstate->task);
+
+ origin = shm_toc_allocate(buildstate->pcxt->toc, gin_shared_mem * 1024);
+ shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_ORIGIN, origin);
+
+ mqsize = (int *)shm_toc_allocate(buildstate->pcxt->toc, sizeof(int));
+ *mqsize = gin_shared_mem * 1024 / buildstate->pcxt->nworkers;
+ shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_PER_WORKER, mqsize);
+
+ initTask(buildstate->task, heap, index);
+
+ buildstate->results = palloc(buildstate->pcxt->nworkers * sizeof(WorkerResult));
+ for (i = 0; i < buildstate->pcxt->nworkers; i++)
+ {
+ WorkerResult *r = buildstate->results + i;
+ r->mq = shm_mq_create((char *)origin + i * (*mqsize), *mqsize);
+ shm_mq_set_receiver(r->mq, MyProc);
+ r->mqh = shm_mq_attach(r->mq, buildstate->pcxt->seg, NULL);
+ r->end_of_tree = false;
+ r->end_of_forest = false;
+ }
+
+ LaunchParallelWorkers(buildstate->pcxt);
+}
+
+static void
+finishWorkers(GinBuildState *buildstate, GinBuildTask *task)
+{
+ WaitForParallelWorkersToFinish(buildstate->pcxt);
+ /* copy the task out of the context before destroing it */
+ memcpy(task, (GinBuildTask *)buildstate->task, sizeof(GinBuildTask));
+ DestroyParallelContext(buildstate->pcxt);
+ ExitParallelMode();
+}
+
IndexBuildResult *
ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
+ GinBuildState buildstate;
+
IndexBuildResult *result;
- double reltuples;
- GinBuildState buildstate;
Buffer RootBuffer,
MetaBuffer;
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- MemoryContext oldCtx;
- OffsetNumber attnum;
+ double reltuples = 0;
+ int wnum = gin_parallel_workers;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
- initGinState(&buildstate.ginstate, index);
- buildstate.indtuples = 0;
- memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
/* initialize the meta page */
MetaBuffer = GinNewBuffer(index);
@@ -363,60 +951,40 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
UnlockReleaseBuffer(RootBuffer);
END_CRIT_SECTION();
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
/* count the root as first entry page */
buildstate.buildStats.nEntryPages++;
- /*
- * create a temporary memory context that is used to hold data not yet
- * dumped out to the index
- */
- buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- /*
- * create a temporary memory context that is used for calling
- * ginExtractEntries(), and can be reset after each tuple
- */
- buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context for user-defined function",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- buildstate.accum.ginstate = &buildstate.ginstate;
- ginInitBA(&buildstate.accum);
+ if ((wnum > 0) && RelationUsesLocalBuffers(heap))
+ {
+ elog(DEBUG1, "not using parallel GIN build on temporary table %s\n", RelationGetRelationName(heap));
+ wnum = 0;
+ }
- /*
- * Do the heap scan. We disallow sync scan here because dataPlaceToPage
- * prefers to receive tuples in TID order.
- */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- ginBuildCallback, (void *) &buildstate);
+ buildstate.pcxt = NULL;
- /* dump remaining entries to the index */
- oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
- ginBeginBAScan(&buildstate.accum);
- while ((list = ginGetBAEntry(&buildstate.accum,
- &attnum, &key, &category, &nlist)) != NULL)
{
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate.ginstate, attnum, key, category,
- list, nlist, &buildstate.buildStats);
- }
- MemoryContextSwitchTo(oldCtx);
+ GinBuildTask task;
+ double backend_reltuples = 0;
- MemoryContextDelete(buildstate.funcCtx);
- MemoryContextDelete(buildstate.tmpCtx);
+ if (wnum > 0)
+ {
+ launchWorkers(&buildstate, wnum, heap, index);
+ backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo);
+ finishWorkers(&buildstate, &task);
+ }
+ else
+ {
+ buildstate.task = &task;
+ initTask(buildstate.task, heap, index);
+ backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo);
+ }
- /*
- * Update metapage stats
- */
- buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
- ginUpdateStats(index, &buildstate.buildStats);
+ reltuples = backend_reltuples + task.reltuples;
+ }
/*
* Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 65a6cd4..6afbb8a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2775,6 +2775,31 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"gin_parallel_workers",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Maximum number of parallel workers for GIN buiding."),
+ NULL,
+ },
+ &gin_parallel_workers,
+ 0, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"gin_shared_mem",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("The size of shared memory segment for parallel GIN buiding."),
+ NULL,
+ GUC_UNIT_KB
+ },
+ &gin_shared_mem,
+ 16 * 1024, 1024, MAX_KILOBYTES,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5536012..15bf5ca 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -541,6 +541,8 @@
#xmloption = 'content'
#gin_fuzzy_search_limit = 0
#gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0 # 0 disables parallel gin build
+#gin_shared_mem = 16MB
# - Locale and Formatting -
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..91e5b27 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,8 @@ typedef char GinTernaryValue;
/* GUC parameters */
extern PGDLLIMPORT int GinFuzzySearchLimit;
extern int gin_pending_list_limit;
+extern int gin_parallel_workers;
+extern int gin_shared_mem;
/* ginutil.c */
extern void ginGetStats(Relation index, GinStatsData *stats);
diff --git a/src/test/regress/expected/gin.out b/src/test/regress/expected/gin.out
index a3911a6..fc6bac0 100644
--- a/src/test/regress/expected/gin.out
+++ b/src/test/regress/expected/gin.out
@@ -23,6 +23,88 @@ select gin_clean_pending_list('gin_test_idx'); -- nothing to flush
0
(1 row)
+-- Test parallel building
+drop index gin_test_idx;
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count
+-------
+ 3
+(1 row)
+
+set gin_parallel_workers = 1;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count
+-------
+ 3
+(1 row)
+
+drop index gin_test_idx;
+set gin_parallel_workers = 2;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count
+-------
+ 3
+(1 row)
+
+drop index gin_test_idx;
+set gin_parallel_workers = 8;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count
+-------
+ 3
+(1 row)
+
-- Test vacuuming
delete from gin_test_tbl where i @> array[2];
vacuum gin_test_tbl;
diff --git a/src/test/regress/sql/gin.sql b/src/test/regress/sql/gin.sql
index c566e9b..8b0eb45 100644
--- a/src/test/regress/sql/gin.sql
+++ b/src/test/regress/sql/gin.sql
@@ -19,6 +19,33 @@ vacuum gin_test_tbl; -- flush the fastupdate buffers
select gin_clean_pending_list('gin_test_idx'); -- nothing to flush
+-- Test parallel building
+
+drop index gin_test_idx;
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+set gin_parallel_workers = 1;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+drop index gin_test_idx;
+set gin_parallel_workers = 2;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+drop index gin_test_idx;
+set gin_parallel_workers = 8;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
-- Test vacuuming
delete from gin_test_tbl where i @> array[2];
vacuum gin_test_tbl;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers