diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 9fd7b4e019b..b72f5363045 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -343,11 +343,68 @@ DecrTupleDescRefCount(TupleDesc tupdesc)
 }
 
 /*
- * Compare two TupleDesc structures for logical equality
+ * Compare two TupleDesc attributes for logical equality
  *
  * Note: we deliberately do not check the attrelid and tdtypmod fields.
  * This allows typcache.c to use this routine to see if a cached record type
  * matches a requested type, and is harmless for relcache.c's uses.
+ */
+bool
+equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2)
+{
+	/*
+	 * We do not need to check every single field here: we can disregard
+	 * attrelid and attnum (which were used to place the row in the attrs
+	 * array in the first place).  It might look like we could dispense
+	 * with checking attlen/attbyval/attalign, since these are derived
+	 * from atttypid; but in the case of dropped columns we must check
+	 * them (since atttypid will be zero for all dropped columns) and in
+	 * general it seems safer to check them always.
+	 *
+	 * attcacheoff must NOT be checked since it's possibly not set in both
+	 * copies.
+	 */
+	if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
+		return false;
+	if (attr1->atttypid != attr2->atttypid)
+		return false;
+	if (attr1->attstattarget != attr2->attstattarget)
+		return false;
+	if (attr1->attlen != attr2->attlen)
+		return false;
+	if (attr1->attndims != attr2->attndims)
+		return false;
+	if (attr1->atttypmod != attr2->atttypmod)
+		return false;
+	if (attr1->attbyval != attr2->attbyval)
+		return false;
+	if (attr1->attstorage != attr2->attstorage)
+		return false;
+	if (attr1->attalign != attr2->attalign)
+		return false;
+	if (attr1->attnotnull != attr2->attnotnull)
+		return false;
+	if (attr1->atthasdef != attr2->atthasdef)
+		return false;
+	if (attr1->attidentity != attr2->attidentity)
+		return false;
+	if (attr1->attisdropped != attr2->attisdropped)
+		return false;
+	if (attr1->attislocal != attr2->attislocal)
+		return false;
+	if (attr1->attinhcount != attr2->attinhcount)
+		return false;
+	if (attr1->attcollation != attr2->attcollation)
+		return false;
+	/* attacl, attoptions and attfdwoptions are not even present... */
+
+	return true;
+}
+
+/*
+ * Compare two TupleDesc structures for logical equality
+ *
+ * Note: see equalTupleDescAttrs for the note on fields that we don't compare.
  * We don't compare tdrefcount, either.
  */
 bool
@@ -369,51 +426,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
 		Form_pg_attribute attr1 = tupdesc1->attrs[i];
 		Form_pg_attribute attr2 = tupdesc2->attrs[i];
 
-		/*
-		 * We do not need to check every single field here: we can disregard
-		 * attrelid and attnum (which were used to place the row in the attrs
-		 * array in the first place).  It might look like we could dispense
-		 * with checking attlen/attbyval/attalign, since these are derived
-		 * from atttypid; but in the case of dropped columns we must check
-		 * them (since atttypid will be zero for all dropped columns) and in
-		 * general it seems safer to check them always.
-		 *
-		 * attcacheoff must NOT be checked since it's possibly not set in both
-		 * copies.
-		 */
-		if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
-			return false;
-		if (attr1->atttypid != attr2->atttypid)
-			return false;
-		if (attr1->attstattarget != attr2->attstattarget)
-			return false;
-		if (attr1->attlen != attr2->attlen)
-			return false;
-		if (attr1->attndims != attr2->attndims)
-			return false;
-		if (attr1->atttypmod != attr2->atttypmod)
-			return false;
-		if (attr1->attbyval != attr2->attbyval)
-			return false;
-		if (attr1->attstorage != attr2->attstorage)
-			return false;
-		if (attr1->attalign != attr2->attalign)
-			return false;
-		if (attr1->attnotnull != attr2->attnotnull)
-			return false;
-		if (attr1->atthasdef != attr2->atthasdef)
-			return false;
-		if (attr1->attidentity != attr2->attidentity)
-			return false;
-		if (attr1->attisdropped != attr2->attisdropped)
-			return false;
-		if (attr1->attislocal != attr2->attislocal)
-			return false;
-		if (attr1->attinhcount != attr2->attinhcount)
-			return false;
-		if (attr1->attcollation != attr2->attcollation)
+		if (!equalTupleDescAttrs(attr1, attr2))
 			return false;
-		/* attacl, attoptions and attfdwoptions are not even present... */
 	}
 
 	if (tupdesc1->constr != NULL)
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2dad3e8a655..54acd989101 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -36,6 +36,7 @@
 #include "utils/memutils.h"
 #include "utils/resowner.h"
 #include "utils/snapmgr.h"
+#include "utils/typcache.h"
 
 
 /*
@@ -47,6 +48,14 @@
  */
 #define PARALLEL_ERROR_QUEUE_SIZE			16384
 
+/*
+ * We want to create a DSA area to store shared state that has the same extent
+ * as a parallel context, to hold the record type registry.  We don't want it
+ * to have to create any DSM segments just yet in common cases, so we'll give
+ * it enough space to hold an empty SharedRecordTypmodRegistry.
+ */
+#define PARALLEL_CONTEXT_DSA_SIZE			0x30000
+
 /* Magic number for parallel context TOC. */
 #define PARALLEL_MAGIC						0x50477c7c
 
@@ -63,6 +72,9 @@
 #define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0007)
 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
 #define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE	UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_CONTEXT_DSA			UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY	UINT64CONST(0xFFFFFFFFFFFF000C)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -191,6 +203,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	Size		library_len = 0;
 	Size		guc_len = 0;
 	Size		combocidlen = 0;
+	Size		typmod_registry_size = 0;
 	Size		tsnaplen = 0;
 	Size		asnaplen = 0;
 	Size		tstatelen = 0;
@@ -226,8 +239,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
 		tstatelen = EstimateTransactionStateSpace();
 		shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+		shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_CONTEXT_DSA_SIZE);
+		typmod_registry_size = SharedRecordTypmodRegistryEstimate();
+		shm_toc_estimate_chunk(&pcxt->estimator, typmod_registry_size);
 		/* If you add more chunks here, you probably need to add keys. */
-		shm_toc_estimate_keys(&pcxt->estimator, 6);
+		shm_toc_estimate_keys(&pcxt->estimator, 8);
 
 		/* Estimate space need for error queues. */
 		StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -295,6 +311,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		char	   *asnapspace;
 		char	   *tstatespace;
 		char	   *error_queue_space;
+		char	   *typemod_registry_space;
+		char	   *context_dsa_space;
 		char	   *entrypointstate;
 		Size		lnamelen;
 
@@ -313,6 +331,27 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		SerializeComboCIDState(combocidlen, combocidspace);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
 
+		/*
+		 * Make a DSA area for dynamically-sized shared state that has the
+		 * same scope as this ParallelContext.
+		 */
+		context_dsa_space = shm_toc_allocate(pcxt->toc,
+											 PARALLEL_CONTEXT_DSA_SIZE);
+		pcxt->context_dsa = dsa_create_in_place(context_dsa_space,
+												PARALLEL_CONTEXT_DSA_SIZE,
+												LWTRANCHE_PARALLEL_CONTEXT_DSA,
+												pcxt->seg);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_CONTEXT_DSA, context_dsa_space);
+
+		/* Set up shared record type registry. */
+		typemod_registry_space = shm_toc_allocate(pcxt->toc,
+												  typmod_registry_size);
+		SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
+									   typemod_registry_space,
+									   pcxt->context_dsa, pcxt->seg);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY,
+					   typemod_registry_space);
+
 		/* Serialize transaction snapshot and active snapshot. */
 		tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
 		SerializeSnapshot(transaction_snapshot, tsnapspace);
@@ -618,6 +657,13 @@ DestroyParallelContext(ParallelContext *pcxt)
 		}
 	}
 
+	/* Detach from the context-scope DSA area, if there is one. */
+	if (pcxt->context_dsa != NULL)
+	{
+		dsa_detach(pcxt->context_dsa);
+		pcxt->context_dsa = NULL;
+	}
+
 	/*
 	 * If we have allocated a shared memory segment, detach it.  This will
 	 * implicitly detach the error queues, and any other shared memory queues,
@@ -938,6 +984,9 @@ ParallelWorkerMain(Datum main_arg)
 	char	   *asnapspace;
 	char	   *tstatespace;
 	StringInfoData msgbuf;
+	char	   *typmod_registry_space;
+	char	   *context_dsa_space;
+	dsa_area   *context_dsa;
 
 	/* Set flag to indicate that we're initializing a parallel worker. */
 	InitializingParallelWorker = true;
@@ -1069,6 +1118,20 @@ ParallelWorkerMain(Datum main_arg)
 	Assert(combocidspace != NULL);
 	RestoreComboCIDState(combocidspace);
 
+	/* Attach to the DSA area. */
+	context_dsa_space = shm_toc_lookup(toc, PARALLEL_KEY_CONTEXT_DSA);
+	Assert(context_dsa_space != NULL);
+	context_dsa = dsa_attach_in_place(context_dsa_space, seg);
+
+	/* Attach to shared record type registry. */
+	typmod_registry_space =
+		shm_toc_lookup(toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY);
+	Assert(typmod_registry_space != NULL);
+	SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
+									 typmod_registry_space,
+									 context_dsa,
+									 seg);
+
 	/* Restore transaction snapshot. */
 	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
 	Assert(tsnapspace != NULL);
@@ -1114,6 +1177,9 @@ ParallelWorkerMain(Datum main_arg)
 	/* Must pop active snapshot so resowner.c doesn't complain. */
 	PopActiveSnapshot();
 
+	/* Detach from context-scoped DSA area. */
+	dsa_detach(context_dsa);
+
 	/* Shut down the parallel-worker transaction. */
 	EndParallelWorkerTransaction();
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 35536e47894..0d7996b5205 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
 
 	if (LWLockTrancheArray == NULL)
 	{
-		LWLockTranchesAllocated = 64;
+		LWLockTranchesAllocated = 128;
 		LWLockTrancheArray = (char **)
 			MemoryContextAllocZero(TopMemoryContext,
 								   LWLockTranchesAllocated * sizeof(char *));
@@ -510,7 +510,13 @@ RegisterLWLockTranches(void)
 						  "predicate_lock_manager");
 	LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
 						  "parallel_query_dsa");
+	LWLockRegisterTranche(LWTRANCHE_PARALLEL_CONTEXT_DSA,
+						  "parallel_context_dsa");
 	LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+	LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+						  "shared_record_atts_index");
+	LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
+						  "shared_record_typmods_index");
 
 	/* Register named tranches. */
 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index 0cf5001a758..f26b0a7a58f 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -55,7 +55,9 @@
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
 #include "executor/executor.h"
+#include "lib/dht.h"
 #include "optimizer/planner.h"
+#include "storage/lwlock.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/fmgroids.h"
@@ -67,7 +69,6 @@
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
-
 /* The main type cache hashtable searched by lookup_type_cache */
 static HTAB *TypeCacheHash = NULL;
 
@@ -148,12 +149,95 @@ typedef struct RecordCacheEntry
 	List	   *tupdescs;
 } RecordCacheEntry;
 
+/*
+ * A mechanism for sharing record typmods between backends.
+ */
+struct SharedRecordTypmodRegistry
+{
+	dht_hash_table_handle atts_index_handle;
+	dht_hash_table_handle typmod_index_handle;
+	pg_atomic_uint32 next_typmod;
+};
+
+/*
+ * A flattened/serialized representation of a TupleDesc for use in shared
+ * memory.  Can be converted to and from regular TupleDesc format.  Doesn't
+ * support constraints and doesn't store the actual type OID, because this is
+ * only for use with RECORD types as created by CreateTupleDesc().  These are
+ * arranged into a linked list, in the hash table entry corresponding to the
+ * OIDs of the first 16 attributes, so we'd expect to get more than one entry
+ * in the list when named and other properties differ.
+ */
+typedef struct SerializedTupleDesc
+{
+	dsa_pointer next;			/* next with the same same attribute OIDs */
+	int			natts;			/* number of attributes in the tuple */
+	int32		typmod;			/* typmod for tuple type */
+	bool		hasoid;			/* tuple has oid attribute in its header */
+
+	/*
+	 * The attributes follow.  We only ever access the first
+	 * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in
+	 * tupdesc.c.
+	 */
+	FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedTupleDesc;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's attribute index.  The key is the
+ * first REC_HASH_KEYS attribute OIDs.  That means that collisions are
+ * possible, but that's OK because SerializedTupleDesc objects are arranged
+ * into a list.
+ */
+typedef struct SRTRAttsIndexEntry
+{
+	Oid			leading_attr_oids[REC_HASH_KEYS];
+	dsa_pointer serialized_tupdesc;
+} SRTRAttsIndexEntry;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's typmod index.  Points to a single
+ * SerializedTupleDesc in shared memory.
+ */
+typedef struct SRTRTypmodIndexEntry
+{
+	uint32		typmod;
+	dsa_pointer serialized_tupdesc;
+} SRTRTypmodIndexEntry;
+
+/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */
+const static dht_parameters srtr_atts_index_params = {
+	sizeof(Oid) * REC_HASH_KEYS,
+	sizeof(SRTRAttsIndexEntry),
+	memcmp,
+	tag_hash,
+	LWTRANCHE_SHARED_RECORD_ATTS_INDEX
+};
+
+/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */
+const static dht_parameters srtr_typmod_index_params = {
+	sizeof(uint32),
+	sizeof(SRTRTypmodIndexEntry),
+	memcmp,
+	tag_hash,
+	LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX
+};
+
 static HTAB *RecordCacheHash = NULL;
 
 static TupleDesc *RecordCacheArray = NULL;
 static int32 RecordCacheArrayLen = 0;	/* allocated length of array */
 static int32 NextRecordTypmod = 0;		/* number of entries used */
 
+/* Current SharedRecordTypmodRegistry, if attached. */
+static struct
+{
+	SharedRecordTypmodRegistry *shared;
+	dht_hash_table *atts_index;
+	dht_hash_table *typmod_index;
+	dsa_area *area;
+} CurrentSharedRecordTypmodRegistry;
+
 static void load_typcache_tupdesc(TypeCacheEntry *typentry);
 static void load_rangetype_info(TypeCacheEntry *typentry);
 static void load_domaintype_info(TypeCacheEntry *typentry);
@@ -174,6 +258,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue);
 static void load_enum_cache_data(TypeCacheEntry *tcache);
 static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg);
 static int	enum_oid_cmp(const void *left, const void *right);
+static void shared_record_typmod_registry_detach(dsm_segment *segment,
+												 Datum datum);
+static int32 find_or_allocate_shared_record_typmod(TupleDesc tupdesc);
+static TupleDesc deserialize_tupledesc(const SerializedTupleDesc *serialized);
+static dsa_pointer serialize_tupledesc(dsa_area *area,
+									   const TupleDesc tupdesc);
+
 
 
 /*
@@ -1199,6 +1290,33 @@ cache_record_field_properties(TypeCacheEntry *typentry)
 	typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES;
 }
 
+/*
+ * Make sure that RecordCacheArray is large enough to store 'typmod'.
+ */
+static void
+ensure_record_cache_typmod_slot_exists(int32 typmod)
+{
+	if (RecordCacheArray == NULL)
+	{
+		RecordCacheArray = (TupleDesc *)
+			MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
+		RecordCacheArrayLen = 64;
+	}
+
+	if (typmod >= RecordCacheArrayLen)
+	{
+		int32		newlen = RecordCacheArrayLen * 2;
+
+		while (typmod >= newlen)
+			newlen *= 2;
+
+		RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
+												  newlen * sizeof(TupleDesc));
+		memset(RecordCacheArray + RecordCacheArrayLen, 0,
+			   (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *));
+		RecordCacheArrayLen = newlen;
+	}
+}
 
 /*
  * lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype
@@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
 		/*
 		 * It's a transient record type, so look in our record-type table.
 		 */
-		if (typmod < 0 || typmod >= NextRecordTypmod)
+		if (typmod >= 0)
 		{
-			if (!noError)
-				ereport(ERROR,
-						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-						 errmsg("record type has not been registered")));
-			return NULL;
+			/* It is already in our local cache? */
+			if (typmod < RecordCacheArrayLen &&
+				RecordCacheArray[typmod] != NULL)
+				return RecordCacheArray[typmod];
+
+			/* Are we attached to a SharedRecordTypmodRegistry? */
+			if (CurrentSharedRecordTypmodRegistry.shared != NULL)
+			{
+				SRTRTypmodIndexEntry *entry;
+
+				/* Try to find it in the shared typmod index. */
+				entry = dht_find(CurrentSharedRecordTypmodRegistry.typmod_index,
+								 &typmod, false);
+				if (entry != NULL)
+				{
+					SerializedTupleDesc *serialized;
+
+					serialized = (SerializedTupleDesc *)
+						dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+										entry->serialized_tupdesc);
+					Assert(typmod == serialized->typmod);
+
+					/* We may need to extend the local RecordCacheArray. */
+					ensure_record_cache_typmod_slot_exists(typmod);
+
+					/* Produce and cache a TupleDesc. */
+					RecordCacheArray[typmod] =
+						deserialize_tupledesc(serialized);
+					dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+								entry);
+
+					return RecordCacheArray[typmod];
+				}
+			}
 		}
-		return RecordCacheArray[typmod];
+
+		if (!noError)
+			ereport(ERROR,
+					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					 errmsg("record type has not been registered")));
+		return NULL;
 	}
 }
 
@@ -1362,30 +1514,27 @@ assign_record_type_typmod(TupleDesc tupDesc)
 		}
 	}
 
+	/* Look in the SharedRecordTypmodRegistry, if attached */
+	newtypmod = find_or_allocate_shared_record_typmod(tupDesc);
+
 	/* Not present, so need to manufacture an entry */
 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
-	if (RecordCacheArray == NULL)
-	{
-		RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc));
-		RecordCacheArrayLen = 64;
-	}
-	else if (NextRecordTypmod >= RecordCacheArrayLen)
-	{
-		int32		newlen = RecordCacheArrayLen * 2;
-
-		RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
-												  newlen * sizeof(TupleDesc));
-		RecordCacheArrayLen = newlen;
-	}
+	/*
+	 * Whether we just got a new typmod from a SharedRecordTypmodRegistry or
+	 * we're allocating one locally, make sure the RecordCacheArray is big
+	 * enough.
+	 */
+	ensure_record_cache_typmod_slot_exists(Max(NextRecordTypmod, newtypmod));
 
 	/* if fail in subrs, no damage except possibly some wasted memory... */
 	entDesc = CreateTupleDescCopy(tupDesc);
 	recentry->tupdescs = lcons(entDesc, recentry->tupdescs);
 	/* mark it as a reference-counted tupdesc */
 	entDesc->tdrefcount = 1;
-	/* now it's safe to advance NextRecordTypmod */
-	newtypmod = NextRecordTypmod++;
+	/* now it's safe to advance NextRecordTypmod, if allocating locally */
+	if (newtypmod == -1)
+		newtypmod = NextRecordTypmod++;
 	entDesc->tdtypmod = newtypmod;
 	RecordCacheArray[newtypmod] = entDesc;
 
@@ -1396,6 +1545,176 @@ assign_record_type_typmod(TupleDesc tupDesc)
 }
 
 /*
+ * Return the amout of shmem required to hold a SharedRecordTypmodRegistry.
+ * This exists only to avoid exposing private innards of
+ * SharedRecordTypmodRegistry in a header.
+ */
+size_t
+SharedRecordTypmodRegistryEstimate(void)
+{
+	return sizeof(SharedRecordTypmodRegistry);
+}
+
+/*
+ * Initialize 'registry' in a pre-existing shared memory region, which must be
+ * maximally aligned and have space for SharedRecordTypmodRegistryEstimate()
+ * bytes.
+ *
+ * 'area' will be used to allocate shared memory space as required for the
+ * typemod registration.  The current process, expected to be a leader process
+ * in a parallel query, will be attached automatically and its current record
+ * types will be loaded into the *registry.  While attached, all calls to
+ * assign_record_type_typmod will use the shared registry.  Other backends
+ * will need to attach explicitly.
+ *
+ * An on-detach callback will be installed for 'segment', so that normal
+ * private record type cache behavior can be restored when the DSM segment
+ * goes away.
+ */
+void
+SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry,
+							 dsa_area *area,
+							 dsm_segment *segment)
+{
+	dht_hash_table *atts_index;
+	dht_hash_table *typmod_index;
+	int32 typmod;
+
+	/* We can't already be attached to a shared registry. */
+	Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+	/* Create the hash table indexed by attribute OIDs. */
+	atts_index = dht_create(area, &srtr_atts_index_params);
+	registry->atts_index_handle = dht_get_hash_table_handle(atts_index);
+
+	/* Create the hash table indexed by typmod. */
+	typmod_index = dht_create(area, &srtr_typmod_index_params);
+	registry->typmod_index_handle = dht_get_hash_table_handle(typmod_index);
+
+	/* Initialize the 'next typmode' to this backend's next value. */
+	pg_atomic_init_u32(&registry->next_typmod, NextRecordTypmod);
+
+	/*
+	 * Copy all entries from this backend's private registry into the shared
+	 * registry.
+	 */
+	for (typmod = 0; typmod < NextRecordTypmod; ++typmod)
+	{
+		SRTRTypmodIndexEntry *typmod_index_entry;
+		SRTRAttsIndexEntry *atts_index_entry;
+		SerializedTupleDesc *serialized;
+		dsa_pointer serialized_dp;
+		TupleDesc tupdesc;
+		Oid atts_key[REC_HASH_KEYS];
+		bool found;
+		int i;
+
+		tupdesc = RecordCacheArray[typmod];
+		if (tupdesc == NULL)
+			continue;
+
+		/* Serialize the TupleDesc into shared memory. */
+		serialized_dp = serialize_tupledesc(area, tupdesc);
+
+		/* Insert into the typmod index. */
+		typmod_index_entry = dht_find_or_insert(typmod_index,
+												&tupdesc->tdtypmod,
+												&found);
+		if (found)
+			elog(ERROR, "cannot create duplicate shared record typmod");
+		typmod_index_entry->typmod = tupdesc->tdtypmod;
+		typmod_index_entry->serialized_tupdesc = serialized_dp;
+		dht_release(typmod_index, typmod_index_entry);
+
+		/* Insert into the attributes index. */
+		memset(atts_key, 0, sizeof(atts_key));
+		for (i = 0; i < Min(tupdesc->natts, REC_HASH_KEYS); ++i)
+			atts_key[i] = tupdesc->attrs[i]->atttypid;
+		atts_index_entry = dht_find_or_insert(atts_index, &atts_key, &found);
+		if (!found)
+		{
+			memcpy(atts_index_entry->leading_attr_oids,
+				   atts_key,
+				   sizeof(atts_key));
+			atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+		}
+
+		/* Push onto list. */
+		serialized = (SerializedTupleDesc *)
+			dsa_get_address(area, serialized_dp);
+		serialized->next = atts_index_entry->serialized_tupdesc;
+		atts_index_entry->serialized_tupdesc = serialized_dp;
+		dht_release(atts_index, atts_index_entry);
+	}
+
+	/* Set up our detach hook so that we can return to private cache mode. */
+	on_dsm_detach(segment, shared_record_typmod_registry_detach,
+				  PointerGetDatum(registry));
+
+	/*
+	 * Set up the global state that will tell assign_record_type_typmod and
+	 * lookup_rowtype_tupdesc_internal about the shared registry.
+	 */
+	CurrentSharedRecordTypmodRegistry.shared = registry;
+	CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+	CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+	CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
+ * Attach to 'registry', which must have been initialized already by another
+ * backend.  Future calls to assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal will use the shared registry, until
+ * 'segment' is detached.
+ */
+void
+SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry,
+							   dsa_area *area,
+							   dsm_segment *segment)
+{
+	dht_hash_table *atts_index;
+	dht_hash_table *typmod_index;
+
+	/* We can't already be attached to a shared registry. */
+	Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+	Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+	/*
+	 * We can't already have typmods in our local cache, because they'd clash
+	 * with those imported by SharedRecordTypmodRegistryInit.  This should be a
+	 * freshly started parallel worker.  If we ever support worker recycling,
+	 * a worker would need to zap its local cache in between servicing
+	 * different queries, in order to be able to call this and synchronize
+	 * typmods with a new leader.
+	 */
+	Assert(NextRecordTypmod == 0);
+
+	/* Attach to the two hash tables. */
+	atts_index = dht_attach(area, &srtr_atts_index_params,
+							registry->atts_index_handle);
+	typmod_index = dht_attach(area, &srtr_typmod_index_params,
+							  registry->typmod_index_handle);
+
+	/* Set up our detach hook so that we can return to private cache mode. */
+	on_dsm_detach(segment, shared_record_typmod_registry_detach,
+				  PointerGetDatum(registry));
+
+	/*
+	 * Set up the global state that will tell assign_record_type_typmod and
+	 * lookup_rowtype_tupdesc_internal about the shared registry.
+	 */
+	CurrentSharedRecordTypmodRegistry.shared = registry;
+	CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+	CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+	CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
  * TypeCacheRelCallback
  *		Relcache inval callback function
  *
@@ -1809,3 +2128,225 @@ enum_oid_cmp(const void *left, const void *right)
 	else
 		return 0;
 }
+
+/*
+ * Serialize a TupleDesc into a SerializedTupleDesc in DSA area 'area', and
+ * return a dsa_pointer.
+ */
+static dsa_pointer
+serialize_tupledesc(dsa_area *area, const TupleDesc tupdesc)
+{
+	SerializedTupleDesc *serialized;
+	dsa_pointer serialized_dp;
+	size_t size;
+	int i;
+
+	size = offsetof(SerializedTupleDesc, attributes) +
+		sizeof(FormData_pg_attribute) * tupdesc->natts;
+	serialized_dp = dsa_allocate(area, size);
+	serialized = (SerializedTupleDesc *) dsa_get_address(area, serialized_dp);
+
+	serialized->natts = tupdesc->natts;
+	serialized->typmod = tupdesc->tdtypmod;
+	serialized->hasoid = tupdesc->tdhasoid;
+	for (i = 0; i < tupdesc->natts; ++i)
+		memcpy(&serialized->attributes[i], tupdesc->attrs[i],
+			   ATTRIBUTE_FIXED_PART_SIZE);
+
+	return serialized_dp;
+}
+
+/*
+ * Deserialize a SerializedTupleDesc to produce a TupleDesc.  The result is
+ * allocated in CacheMemoryContext and has a refcount of 1.
+ */
+static TupleDesc
+deserialize_tupledesc(const SerializedTupleDesc *serialized)
+{
+	Form_pg_attribute *attributes;
+	MemoryContext oldctxt;
+	TupleDesc tupdesc;
+	int i;
+
+	/*
+	 * We have an array of FormData_pg_attribute but we need an array of
+	 * pointers to FormData_pg_attribute.
+	 */
+	oldctxt = MemoryContextSwitchTo(CacheMemoryContext);
+	attributes = palloc(sizeof(Form_pg_attribute) * serialized->natts);
+	for (i = 0; i < serialized->natts; ++i)
+	{
+		attributes[i] = palloc(ATTRIBUTE_FIXED_PART_SIZE);
+		memcpy(attributes[i], &serialized->attributes[i],
+			   ATTRIBUTE_FIXED_PART_SIZE);
+	}
+	tupdesc =
+		CreateTupleDesc(serialized->natts, serialized->hasoid, attributes);
+	tupdesc->tdtypmod = serialized->typmod;
+	tupdesc->tdrefcount = 1;
+	MemoryContextSwitchTo(oldctxt);
+
+	return tupdesc;
+}
+
+/*
+ * We can't use equalTupleDescs to compare a SerializedTupleDesc with a
+ * TupleDesc, but we don't want to allocate memory just to compare.  This
+ * function produces the same result without deserializing first.
+ */
+static bool
+serialized_tupledesc_matches(SerializedTupleDesc *serialized,
+							 TupleDesc tupdesc)
+{
+	int i;
+
+	if (serialized->natts != tupdesc->natts ||
+		serialized->hasoid != tupdesc->tdhasoid ||
+		tupdesc->constr != NULL)
+		return false;
+
+	for (i = 0; i < serialized->natts; ++i)
+	{
+		if (!equalTupleDescAttrs(&serialized->attributes[i],
+								 tupdesc->attrs[i]))
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * If we are attached to a SharedRecordTypmodRegistry, find or create a
+ * SerializedTupleDesc that matches 'tupdesc', and return its typmod.
+ * Otherwise return -1.
+ */
+static int32
+find_or_allocate_shared_record_typmod(TupleDesc tupdesc)
+{
+	SRTRAttsIndexEntry *atts_index_entry;
+	SRTRTypmodIndexEntry *typmod_index_entry;
+	SerializedTupleDesc *serialized;
+	dsa_pointer serialized_dp;
+	Oid			hashkey[REC_HASH_KEYS];
+	bool		found;
+	int32		typmod;
+	int			i;
+
+	/* If not even attached, nothing to do. */
+	if (CurrentSharedRecordTypmodRegistry.shared == NULL)
+		return -1;
+
+	/* Try to find a match. */
+	memset(hashkey, 0, sizeof(hashkey));
+	for (i = 0; i < tupdesc->natts; ++i)
+		hashkey[i] = tupdesc->attrs[i]->atttypid;
+	atts_index_entry = (SRTRAttsIndexEntry *)
+		dht_find_or_insert(CurrentSharedRecordTypmodRegistry.atts_index,
+						   hashkey,
+						   &found);
+	if (!found)
+	{
+		/* Making a new entry. */
+		memcpy(atts_index_entry->leading_attr_oids,
+			   hashkey,
+			   sizeof(hashkey));
+		atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+	}
+
+	/* Scan the list we found for a matching serialized one. */
+	serialized_dp = atts_index_entry->serialized_tupdesc;
+	while (DsaPointerIsValid(serialized_dp))
+	{
+		serialized =
+			dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+							serialized_dp);
+		if (serialized_tupledesc_matches(serialized, tupdesc))
+		{
+			/* Found a match, we are finished. */
+			typmod = serialized->typmod;
+			dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+						atts_index_entry);
+			return typmod;
+		}
+		serialized_dp = serialized->next;
+	}
+
+	/* We didn't find a matching entry, so let's allocate a new one. */
+	typmod = (int)
+		pg_atomic_fetch_add_u32(&CurrentSharedRecordTypmodRegistry.shared->next_typmod,
+								1);
+
+	/* Allocate shared memory and serialize the TupleDesc. */
+	serialized_dp = serialize_tupledesc(CurrentSharedRecordTypmodRegistry.area,
+										tupdesc);
+	serialized = (SerializedTupleDesc *)
+		dsa_get_address(CurrentSharedRecordTypmodRegistry.area, serialized_dp);
+	serialized->typmod = typmod;
+
+	/*
+	 * While we still hold the atts_index entry locked, add this to
+	 * typmod_index.  That's important because we don't want anyone to be able
+	 * to find a typmod via the former that can't yet be looked up in the
+	 * latter.
+	 */
+	typmod_index_entry =
+		dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index,
+						   &typmod, &found);
+	if (found)
+		elog(ERROR, "cannot create duplicate shared record typmod");
+	typmod_index_entry->typmod = typmod;
+	typmod_index_entry->serialized_tupdesc = serialized_dp;
+	dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+				typmod_index_entry);
+
+	/* Push onto the front of list in atts_index_entry. */
+	serialized->next = atts_index_entry->serialized_tupdesc;
+	atts_index_entry->serialized_tupdesc = serialized_dp;
+
+	dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+				atts_index_entry);
+
+	return typmod;
+}
+
+/*
+ * DSM segment detach hook used to disconnect this backend's record typmod
+ * cache from the shared registry.  Detaching from the
+ * SharedRecordTypmodRegistry returns this backend to local typmod cache mode
+ * until such time as another parallel query runs.
+ */
+static void
+shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum)
+{
+	SharedRecordTypmodRegistry *shared;
+
+	shared = (SharedRecordTypmodRegistry *) DatumGetPointer(datum);
+
+	/*
+	 * XXX Should we now copy all entries from shared memory into the
+	 * backend's local cache?  That depends on whether you think that there is
+	 * any chance this backend could see any shared tuples created by other
+	 * backends after this detach operation.  If tuples somehow survived from
+	 * query to query, that would be true.  But presently I don't think they
+	 * do, if we assume that all mechanisms that allow us to receive tuples
+	 * from other backends are linked to DSM segment mapping lifetime (tuple
+	 * queues, shared hash tables, shared temporary files).
+	 *
+	 * The only thing we need to synchronize to return to local-typmod-cache
+	 * mode is NextRecordTypmod.  That means that we can resume generating new
+	 * backend-local entries that don't clash.
+	 */
+	NextRecordTypmod = pg_atomic_read_u32(&shared->next_typmod);
+
+	/*
+	 * We don't free the SharedRecordTypmodRegistry's DSM memory, though we
+	 * could using a reference counting scheme if we wanted to.  There doesn't
+	 * seem to be any point because the whole DSA area will be going away
+	 * automatically when the DSM segment containing it is destroyed,
+	 * conceptually like a MemoryContext.
+	 */
+	CurrentSharedRecordTypmodRegistry.shared = NULL;
+	CurrentSharedRecordTypmodRegistry.atts_index = NULL;
+	CurrentSharedRecordTypmodRegistry.typmod_index = NULL;
+	CurrentSharedRecordTypmodRegistry.area = NULL;
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 590e27a4845..b5781c28693 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -19,6 +19,7 @@
 #include "postmaster/bgworker.h"
 #include "storage/shm_mq.h"
 #include "storage/shm_toc.h"
+#include "utils/dsa.h"
 
 typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
 
@@ -40,6 +41,7 @@ typedef struct ParallelContext
 	ErrorContextCallback *error_context_stack;
 	shm_toc_estimator estimator;
 	dsm_segment *seg;
+	dsa_area   *context_dsa;
 	void	   *private_memory;
 	shm_toc    *toc;
 	ParallelWorkerInfo *worker;
diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h
index b48f839028b..97a73b1483e 100644
--- a/src/include/access/tupdesc.h
+++ b/src/include/access/tupdesc.h
@@ -110,6 +110,9 @@ extern void DecrTupleDescRefCount(TupleDesc tupdesc);
 			DecrTupleDescRefCount(tupdesc); \
 	} while (0)
 
+extern bool equalTupleDescAttrs(Form_pg_attribute attr1,
+								Form_pg_attribute attr2);
+
 extern bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
 
 extern void TupleDescInitEntry(TupleDesc desc,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb6d8e..c0117f7bafd 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -212,7 +212,10 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_LOCK_MANAGER,
 	LWTRANCHE_PREDICATE_LOCK_MANAGER,
 	LWTRANCHE_PARALLEL_QUERY_DSA,
+	LWTRANCHE_PARALLEL_CONTEXT_DSA,
 	LWTRANCHE_TBM,
+	LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+	LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
 	LWTRANCHE_FIRST_USER_DEFINED
 }	BuiltinTrancheIds;
 
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 1bf94e2548d..861610a2835 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -18,6 +18,8 @@
 
 #include "access/tupdesc.h"
 #include "fmgr.h"
+#include "storage/dsm.h"
+#include "utils/dsa.h"
 
 
 /* DomainConstraintCache is an opaque struct known only within typcache.c */
@@ -139,6 +141,7 @@ typedef struct DomainConstraintRef
 	MemoryContextCallback callback;		/* used to release refcount when done */
 } DomainConstraintRef;
 
+typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry;
 
 extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags);
 
@@ -160,4 +163,12 @@ extern void assign_record_type_typmod(TupleDesc tupDesc);
 
 extern int	compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
 
+extern size_t SharedRecordTypmodRegistryEstimate(void);
+extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
+										   dsa_area *area,
+										   dsm_segment *seg);
+extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *,
+											 dsa_area *area,
+											 dsm_segment *seg);
+
 #endif   /* TYPCACHE_H */
