From 47487786cb969e49ad3cd18d7258d72d03885b57 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:03:24 -0700
Subject: [PATCH 2/4] Split the SetSubscriptionRelState function into two

---
 src/backend/catalog/pg_subscription.c       |  133 +++++++++++++++------------
 src/backend/commands/subscriptioncmds.c     |    8 +-
 src/backend/replication/logical/tablesync.c |   34 +++----
 src/include/catalog/pg_subscription_rel.h   |    6 +-
 4 files changed, 98 insertions(+), 83 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index fb53d71..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn)
 {
 	Relation	rel;
 	HeapTuple	tup;
-	Oid			subrelid = InvalidOid;
+	Oid			subrelid;
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
+	if (HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u already exists",
+			 relid, subid);
 
-	/*
-	 * If the record for given table does not exist yet create new record,
-	 * otherwise update the existing one.
-	 */
-	if (!HeapTupleIsValid(tup) && !update_only)
-	{
-		/* Form the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-		/* Insert tuple into catalog. */
-		subrelid = CatalogTupleInsert(rel, tup);
-
-		heap_freetuple(tup);
-	}
-	else if (HeapTupleIsValid(tup))
-	{
-		bool		replaces[Natts_pg_subscription_rel];
+	/* Form the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-		/* Update the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		memset(replaces, false, sizeof(replaces));
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	/* Insert tuple into catalog. */
+	subrelid = CatalogTupleInsert(rel, tup);
 
-		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	heap_freetuple(tup);
 
-		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-								replaces);
+	/* Cleanup. */
+	heap_close(rel, NoLock);
 
-		/* Update the catalog. */
-		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	return subrelid;
+}
 
-		subrelid = HeapTupleGetOid(tup);
-	}
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subrelid;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	subrelid = HeapTupleGetOid(tup);
 
 	/* Cleanup. */
 	heap_close(rel, NoLock);
@@ -381,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	HeapTuple	tup;
 	int			nkeys = 0;
 
+	Assert(OidIsValid(subid) || OidIsValid(relid));
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	if (OidIsValid(subid))
@@ -404,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	/* Do the search and delete what we found. */
 	scan = heap_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
-	{
 		CatalogTupleDelete(rel, &tup->t_self);
-	}
 	heap_endscan(scan);
 
 	heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9bc1d17..354d037 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 				CheckSubscriptionRelkind(get_rel_relkind(relid),
 										 rv->schemaname, rv->relname);
 
-				SetSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr, false);
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
 			}
 
 			/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, subrel_local_oids,
 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
 		{
-			SetSubscriptionRelState(sub->oid, relid,
+			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr, false);
+									InvalidXLogRecPtr);
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" added to subscription \"%s\"",
 							rv->schemaname, rv->relname, sub->name)));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 4cca0f1..42460b3 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		SetSubscriptionRelState(MyLogicalRepWorker->subid,
-								MyLogicalRepWorker->relid,
-								MyLogicalRepWorker->relstate,
-								MyLogicalRepWorker->relstate_lsn,
-								true);
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					StartTransactionCommand();
 					started_tx = true;
 				}
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										rstate->relid, rstate->state,
-										rstate->lsn, true);
+
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   rstate->relid, rstate->state,
+										   rstate->lsn);
 			}
 		}
 		else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
-										MyLogicalRepWorker->relstate,
-										MyLogicalRepWorker->relstate_lsn,
-										true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   MyLogicalRepWorker->relstate,
+										   MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
 					 */
-					SetSubscriptionRelState(MyLogicalRepWorker->subid,
-											MyLogicalRepWorker->relid,
-											SUBREL_STATE_SYNCDONE,
-											*origin_startpos,
-											true);
+					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   SUBREL_STATE_SYNCDONE,
+											   *origin_startpos);
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
-- 
1.7.1

