>From 1ba4534a0efef3f0ff8e354a1b1192e3c50b5dff Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Mon, 6 Mar 2017 13:07:45 +0100
Subject: [PATCH] Add option to modify sync commit per subscription

This also changes default behaviour of subscription workers to
synchronous_commit = off
---
 doc/src/sgml/catalogs.sgml                 | 11 ++++++
 doc/src/sgml/ref/alter_subscription.sgml   |  1 +
 doc/src/sgml/ref/create_subscription.sgml  | 15 ++++++++
 src/backend/catalog/pg_subscription.c      |  1 +
 src/backend/commands/subscriptioncmds.c    | 61 ++++++++++++++++++++++++------
 src/backend/replication/logical/launcher.c |  2 +-
 src/backend/replication/logical/worker.c   | 28 +++++++++++++-
 src/bin/pg_dump/pg_dump.c                  | 12 +++++-
 src/bin/pg_dump/pg_dump.h                  |  1 +
 src/bin/psql/describe.c                    |  5 ++-
 src/include/catalog/pg_subscription.h      | 11 ++++--
 src/test/regress/expected/subscription.out | 27 ++++++-------
 src/test/regress/sql/subscription.sql      |  3 +-
 13 files changed, 144 insertions(+), 34 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index daa85f2..71667bd 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6383,6 +6383,17 @@
      </row>
 
      <row>
+      <entry><structfield>subsynccommit</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>
+       If true, the apply for the subscription will run with
+       <literal>synchronous_commit</literal> set to <literal>local</literal>.
+       Otherwise it will have it set to <literal>false</literal>.
+      </entry>
+     </row>
+
+     <row>
       <entry><structfield>subconninfo</structfield></entry>
       <entry><type>text</type></entry>
       <entry></entry>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index b34386d..2bb74f3 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep
 <phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>
 
     SLOT NAME = slot_name
+    | SYNCHRONOUS_COMMIT = boolean
 
 ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION publication_name [, ...] WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
 ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 91127ea..97432e0 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -29,6 +29,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
     | CREATE SLOT | NOCREATE SLOT
     | SLOT NAME = slot_name
     | COPY DATA | NOCOPY DATA
+    | SYNCHRONOUS_COMMIT = boolean
     | NOCONNECT
 </synopsis>
  </refsynopsisdiv>
@@ -145,6 +146,20 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
    </varlistentry>
 
    <varlistentry>
+    <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="parameter">boolean</replaceable></literal></term>
+    <listitem>
+     <para>
+      Modifies the <literal>synchronous_commit</literal> setting of the
+      subscription workers. When set to <literal>true</literal>, the
+      <literal>synchronous_commit</literal> for the worker will be set to
+      <literal>local</local> otherwise to <literal>false</literal>. The
+      default value is <literal>false</literal> independently of the default
+      <literal>synchronous_commit</literal> setting for the instance.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term>NOCONNECT</term>
     <listitem>
      <para>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8850b7e..4dd527d 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->synccommit = subform->subsynccommit;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ef33ece..39c0f11 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -56,12 +56,13 @@
 static void
 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot, char **slot_name,
-						   bool *copy_data)
+						   bool *copy_data, bool *synchronous_commit)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
 	bool		create_slot_given = false;
 	bool		copy_data_given = false;
+	bool		synchronous_commit_given = false;
 
 	if (connect)
 		*connect = true;
@@ -76,6 +77,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*slot_name = NULL;
 	if (copy_data)
 		*copy_data = true;
+	if (synchronous_commit)
+		*synchronous_commit = false;
 
 	/* Parse options */
 	foreach (lc, options)
@@ -161,6 +164,26 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			copy_data_given = true;
 			*copy_data = !defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit)
+		{
+			if (synchronous_commit_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			synchronous_commit_given = true;
+			*synchronous_commit = defGetBoolean(defel);
+		}
+		else if (strcmp(defel->defname, "nosynchronous_commit") == 0 && synchronous_commit)
+		{
+			if (synchronous_commit_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			synchronous_commit_given = true;
+			*synchronous_commit = !defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -265,6 +288,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		enabled_given;
 	bool		enabled;
 	bool		copy_data;
+	bool		synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
 	char		originname[NAMEDATALEN];
@@ -276,7 +300,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * Connection and publication should not be specified here.
 	 */
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
-							   &enabled, &create_slot, &slotname, &copy_data);
+							   &enabled, &create_slot, &slotname, &copy_data,
+							   &synchronous_commit);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -326,6 +351,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subsynccommit - 1] =
+		BoolGetDatum(synchronous_commit);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	values[Anum_pg_subscription_subslotname - 1] =
@@ -573,14 +600,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
-				char *slot_name;
+				char   *slot_name;
+				bool	synchronous_commit;
+				Form_pg_subscription form;
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, &slot_name, NULL);
+				form = (Form_pg_subscription) GETSTRUCT(tup);
+				synchronous_commit = form->subsynccommit;
 
-				values[Anum_pg_subscription_subslotname - 1] =
-					DirectFunctionCall1(namein, CStringGetDatum(slot_name));
-				replaces[Anum_pg_subscription_subslotname - 1] = true;
+				parse_subscription_options(stmt->options, NULL, NULL, NULL,
+										   NULL, &slot_name, NULL,
+										   &synchronous_commit);
+
+				if (slot_name)
+				{
+					values[Anum_pg_subscription_subslotname - 1] =
+						DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+					replaces[Anum_pg_subscription_subslotname - 1] = true;
+				}
+				values[Anum_pg_subscription_subsynccommit - 1] =
+					BoolGetDatum(synchronous_commit);
+				replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 
 				update_tuple = true;
 				break;
@@ -593,7 +632,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL);
+										   NULL, NULL, NULL);
 				Assert(enabled_given);
 
 				values[Anum_pg_subscription_subenabled - 1] =
@@ -617,7 +656,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				Subscription   *sub = GetSubscription(subid, false);
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, &copy_data);
+										   NULL, NULL, &copy_data, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					 publicationListToArray(stmt->publication);
@@ -638,7 +677,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				Subscription   *sub = GetSubscription(subid, false);
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, &copy_data);
+										   NULL, NULL, &copy_data, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3e724de..b536a35 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -126,7 +126,7 @@ get_subscription_list(void)
 		 */
 		oldcxt = MemoryContextSwitchTo(resultcxt);
 
-		sub = (Subscription *) palloc(sizeof(Subscription));
+		sub = (Subscription *) palloc0(sizeof(Subscription));
 		sub->oid = HeapTupleGetOid(tup);
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5383364..6d3a269 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1340,6 +1340,21 @@ reread_subscription(void)
 	}
 
 	/*
+	 * We need to make new connection to new slot if slot name has changed
+	 * so exit here as well if that's the case.
+	 */
+	if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
+	{
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"restart because the replication slot name was changed",
+						MySubscription->name)));
+
+		walrcv_disconnect(wrconn);
+		proc_exit(0);
+	}
+
+	/*
 	 * Exit if publication list was changed. The launcher will start
 	 * new worker.
 	 */
@@ -1371,8 +1386,7 @@ reread_subscription(void)
 	}
 
 	/* Check for other changes that should never happen too. */
-	if (newsub->dbid != MySubscription->dbid ||
-		strcmp(newsub->slotname, MySubscription->slotname) != 0)
+	if (newsub->dbid != MySubscription->dbid)
 	{
 		elog(ERROR, "subscription %u changed unexpectedly",
 			 MyLogicalRepWorker->subid);
@@ -1384,6 +1398,11 @@ reread_subscription(void)
 
 	MemoryContextSwitchTo(oldctx);
 
+	/* Change synchronous commit according to the user's wishes */
+	SetConfigOption("synchronous_commit",
+					MySubscription->synccommit ? "local" : "off",
+					PGC_BACKEND, PGC_S_OVERRIDE);
+
 	if (started_tx)
 		CommitTransactionCommand();
 
@@ -1451,6 +1470,11 @@ ApplyWorkerMain(Datum main_arg)
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
 
+	/* Setup synchronous commit according to the user's wishes */
+	SetConfigOption("synchronous_commit",
+					MySubscription->synccommit ? "local" : "off",
+					PGC_BACKEND, PGC_S_OVERRIDE);
+
 	if (!MySubscription->enabled)
 	{
 		ereport(LOG,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c7876fe..7f22ac5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3622,6 +3622,7 @@ getSubscriptions(Archive *fout)
 	int			i_subname;
 	int			i_rolname;
 	int			i_subenabled;
+	int			i_subsynccommit;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subpublications;
@@ -3639,7 +3640,8 @@ getSubscriptions(Archive *fout)
 	appendPQExpBuffer(query,
 					  "SELECT s.tableoid, s.oid, s.subname,"
 					  "(%s s.subowner) AS rolname, s.subenabled, "
-					  " s.subconninfo, s.subslotname, s.subpublications "
+					  " s.subsynccommit, s.subconninfo, s.subslotname, "
+					  " s.subpublications "
 					  "FROM pg_catalog.pg_subscription s "
 					  "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
 					  "                   WHERE datname = current_database())",
@@ -3653,6 +3655,7 @@ getSubscriptions(Archive *fout)
 	i_subname = PQfnumber(res, "subname");
 	i_rolname = PQfnumber(res, "rolname");
 	i_subenabled = PQfnumber(res, "subenabled");
+	i_subsynccommit = PQfnumber(res, "subsynccommit");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subpublications = PQfnumber(res, "subpublications");
@@ -3670,6 +3673,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
 		subinfo[i].subenabled =
 			(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
+		subinfo[i].subsynccommit =
+			(strcmp(PQgetvalue(res, i, i_subsynccommit), "t") == 0);
 		subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
 		subinfo[i].subpublications =
@@ -3739,6 +3744,11 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
 	else
 		appendPQExpBufferStr(query, "DISABLED");
 
+	if (subinfo->subsynccommit)
+		appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = true");
+	else
+		appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = false");
+
 	appendPQExpBufferStr(query, ", SLOT NAME = ");
 	appendStringLiteralAH(query, subinfo->subslotname, fout);
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index a466527..5934eb0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -604,6 +604,7 @@ typedef struct _SubscriptionInfo
 	DumpableObject dobj;
 	char	   *rolname;
 	bool		subenabled;
+	bool		subsynccommit;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subpublications;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e2e4cbc..3d4cc84 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5078,7 +5078,8 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false,
+		false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5104,7 +5105,9 @@ describeSubscriptions(const char *pattern, bool verbose)
 	if (verbose)
 	{
 		appendPQExpBuffer(&buf,
+						  ",  subsynccommit AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
+						  gettext_noop("Synchronous Commit"),
 						  gettext_noop("Conninfo"));
 	}
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0811880..62845e9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -39,6 +39,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
 
 	bool		subenabled;		/* True if the subscription is enabled
 								 * (the worker should be running) */
+	bool		subsynccommit;	/* Should apply use synchronous commit? */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	text		subconninfo;	/* Connection string to the publisher */
@@ -54,14 +55,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
  *		compiler constants for pg_subscription
  * ----------------
  */
-#define Natts_pg_subscription					7
+#define Natts_pg_subscription					8
 #define Anum_pg_subscription_subdbid			1
 #define Anum_pg_subscription_subname			2
 #define Anum_pg_subscription_subowner			3
 #define Anum_pg_subscription_subenabled			4
-#define Anum_pg_subscription_subconninfo		5
-#define Anum_pg_subscription_subslotname		6
-#define Anum_pg_subscription_subpublications	7
+#define Anum_pg_subscription_subsynccommit		5
+#define Anum_pg_subscription_subconninfo		6
+#define Anum_pg_subscription_subslotname		7
+#define Anum_pg_subscription_subpublications	8
 
 
 typedef struct Subscription
@@ -71,6 +73,7 @@ typedef struct Subscription
 	char   *name;			/* Name of the subscription */
 	Oid		owner;			/* Oid of the subscription owner */
 	bool	enabled;		/* Indicates if the subscription is enabled */
+	bool	synccommit;		/* Indicates if apply should use synchronous commit */
 	char   *conninfo;		/* Connection string to the publisher */
 	char   *slotname;		/* Name of the replication slot */
 	List   *publications;	/* List of publication names to subscribe to */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index d2187ee..213669b 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -26,18 +26,18 @@ reset client_min_messages;
 CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                               List of subscriptions
-  Name   |           Owner           | Enabled | Publication |      Conninfo       
----------+---------------------------+---------+-------------+---------------------
- testsub | regress_subscription_user | f       | {testpub}   | dbname=doesnotexist
+                                         List of subscriptions
+  Name   |           Owner           | Enabled | Publication | Synchronous Commit |      Conninfo       
+---------+---------------------------+---------+-------------+--------------------+---------------------
+ testsub | regress_subscription_user | f       | {testpub}   | f                  | dbname=doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2';
 \dRs+
-                               List of subscriptions
-  Name   |           Owner           | Enabled | Publication |       Conninfo       
----------+---------------------------+---------+-------------+----------------------
- testsub | regress_subscription_user | f       | {testpub}   | dbname=doesnotexist2
+                                          List of subscriptions
+  Name   |           Owner           | Enabled | Publication | Synchronous Commit |       Conninfo       
+---------+---------------------------+---------+-------------+--------------------+----------------------
+ testsub | regress_subscription_user | f       | {testpub}   | f                  | dbname=doesnotexist2
 (1 row)
 
 BEGIN;
@@ -59,11 +59,12 @@ ALTER SUBSCRIPTION testsub DISABLE;
 
 COMMIT;
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
-\dRs
-                      List of subscriptions
-    Name     |           Owner           | Enabled | Publication 
--------------+---------------------------+---------+-------------
- testsub_foo | regress_subscription_user | f       | {testpub}
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true);
+\dRs+
+                                            List of subscriptions
+    Name     |           Owner           | Enabled | Publication | Synchronous Commit |       Conninfo       
+-------------+---------------------------+---------+-------------+--------------------+----------------------
+ testsub_foo | regress_subscription_user | f       | {testpub}   | t                  | dbname=doesnotexist2
 (1 row)
 
 -- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3bdf79d..41d3055 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -40,8 +40,9 @@ ALTER SUBSCRIPTION testsub DISABLE;
 COMMIT;
 
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true);
 
-\dRs
+\dRs+
 
 -- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
 BEGIN;
-- 
2.7.4

