>From a6eb4a795c6e1e9347e0f33ea426635bd9a8afac Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Tue, 18 Apr 2017 10:18:04 -0400
Subject: [PATCH] Wait between tablesync worker restarts

Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval.  This avoids restarting failing workers in
a tight loop.
---
 src/backend/replication/logical/tablesync.c | 44 +++++++++++++++++++++++++----
 1 file changed, 38 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 108326bef1..88d01adbd2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If there are tables that need synchronizing and are not being synchronized
  * yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers).  To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info.  We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
  *
  * For tables that are being synchronized already, check if sync workers
  * either need action from the apply worker or have finished.
@@ -263,11 +266,28 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
+	struct tablesync_start_time_mapping
+	{
+		Oid			relid;
+		TimestampTz	last_start_time;
+	};
 	static List *table_states = NIL;
+	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 
 	Assert(!IsTransactionState());
 
+	if (!last_start_times)
+	{
+		HASHCTL		ctl;
+
+		memset(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(Oid);
+		ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
+		last_start_times = hash_create("Logical replication table sync worker start times",
+									   256, &ctl, HASH_ELEM | HASH_BLOBS);
+	}
+
 	/* We need up to date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
@@ -403,11 +423,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 */
 			else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
 			{
-				logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-										 MySubscription->oid,
-										 MySubscription->name,
-										 MyLogicalRepWorker->userid,
-										 rstate->relid);
+				TimestampTz	now = GetCurrentTimestamp();
+				struct tablesync_start_time_mapping *hentry;
+				bool		found;
+
+				hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found);
+
+				if (!found ||
+					TimestampDifferenceExceeds(hentry->last_start_time, now,
+											   wal_retrieve_retry_interval))
+				{
+					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+											 MySubscription->oid,
+											 MySubscription->name,
+											 MyLogicalRepWorker->userid,
+											 rstate->relid);
+					hentry->last_start_time = now;
+				}
 			}
 		}
 	}
-- 
2.12.2

