From 54476f422646209ddb21c1e073c1a41852dfb31d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 1 Jun 2017 12:26:41 +0900
Subject: [PATCH 4/4] Wait for table sync worker to finish when apply worker
 exits.

---
 src/backend/replication/logical/launcher.c | 51 +++++++++++++++++++++++++++++-
 src/include/replication/worker_internal.h  |  1 +
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 345a415..0ae3751 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -404,8 +404,53 @@ retry:
 }
 
 /*
+ * Stop all table sync workers associated with given subid.
+ *
+ * This function is called by apply worker. Since table sync
+ * worker associated with same subscription is launched by
+ * only the apply worker. We don't need to acquire
+ * LogicalRepLauncherLock here.
+ */
+void
+logicalrep_sync_workers_stop(Oid subid)
+{
+	List *relid_list = NIL;
+	ListCell *cell;
+	int	i;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+	/*
+	 * Walks the workers array and get relid list that matches
+	 * given subscription id.
+	 */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid &&
+			OidIsValid(w->relid))
+			relid_list = lappend_oid(relid_list, w->relid);
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+
+	/* Return if there is no table sync worker associated with myself */
+	if (relid_list == NIL)
+		return;
+
+	foreach (cell, relid_list)
+	{
+		Oid	relid = lfirst_oid(cell);
+
+		logicalrep_worker_stop(subid, relid);
+	}
+}
+
+/*
  * Stop the logical replication worker and wait until it detaches from the
- * slot.
+ * slot. This function can be called by both logical replication launcher
+ * and apply worker to stop apply worker and table sync worker.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -571,6 +616,10 @@ logicalrep_worker_attach(int slot)
 static void
 logicalrep_worker_detach(void)
 {
+	/* Stop all sync workers associated if apply worker */
+	if (!IsTablesyncWorker())
+		logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+
 	/* Block concurrent access. */
 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c310ca5..e20933a 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -76,6 +76,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_sync_workers_stop(Oid subid);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
-- 
2.8.1

