On Thu, Nov 9, 2023, at 8:12 PM, Michael Paquier wrote:
> On Thu, Nov 09, 2023 at 03:41:53PM +0100, Peter Eisentraut wrote:
> > On 08.11.23 00:12, Michael Paquier wrote:
> >> - Should the subdirectory pg_basebackup be renamed into something more
> >> generic at this point?  All these things are frontend tools that deal
> >> in some way with the replication protocol to do their work.  Say
> >> a replication_tools?
> > 
> > Seems like unnecessary churn.  Nobody has complained about any of the other
> > tools in there.
> 
> Not sure.  We rename things across releases in the tree from time to
> time, and here that's straight-forward.

Based on this discussion it seems we have a consensus that this tool should be
in the pg_basebackup directory. (If/when we agree with the directory renaming,
it could be done in a separate patch.) Besides this move, the v3 provides a dry
run mode. It basically executes every routine but skip when should do
modifications. It is an useful option to check if you will be able to run it
without having issues with connectivity, permission, and existing objects
(replication slots, publications, subscriptions). Tests were slightly improved.
Messages were changed to *not* provide INFO messages by default and --verbose
provides INFO messages and --verbose --verbose also provides DEBUG messages. I
also refactored the connect_database() function into which the connection will
always use the logical replication mode. A bug was fixed in the transient
replication slot name. Ashutosh review [1] was included. The code was also 
indented.

There are a few suggestions from Ashutosh [2] that I will reply in another
email.

I'm still planning to work on the following points:

1. improve the cleanup routine to point out leftover objects if there is any
   connection issue.
2. remove the physical replication slot if the standby is using one
   (primary_slot_name).
3. provide instructions to promote the logical replica into primary, I mean,
   stop the replication between the nodes and remove the replication setup
   (publications, subscriptions, replication slots). Or even include another
   action to do it. We could add both too.

Point 1 should be done. Points 2 and 3 aren't essential but will provide a nice
UI for users that would like to use it.


[1] 
https://postgr.es/m/CAExHW5sCAU3NvPKd7msScQKvrBN-x_AdDQD-ZYAwOxuWG%3Doz1w%40mail.gmail.com
[2] 
https://postgr.es/m/caexhw5vhfemfvtuhe+7xwphvzjxrexz5h3dd4uqi7cwmdmj...@mail.gmail.com


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 003255b64910ce73f15931a43def25c37be96b81 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Mon, 5 Jun 2023 14:39:40 -0400
Subject: [PATCH v3] Creates a new logical replica from a standby server

A new tool called pg_subscriber can convert a physical replica into a
logical replica. It runs on the target server and should be able to
connect to the source server (publisher) and the target server
(subscriber).

The conversion requires a few steps. Check if the target data directory
has the same system identifier than the source data directory. Stop the
target server if it is running as a standby server. Create one
replication slot per specified database on the source server. One
additional replication slot is created at the end to get the consistent
LSN (This consistent LSN will be used as (a) a stopping point for the
recovery process and (b) a starting point for the subscriptions). Write
recovery parameters into the target data directory and start the target
server (Wait until the target server is promoted). Create one
publication (FOR ALL TABLES) per specified database on the source
server. Create one subscription per specified database on the target
server (Use replication slot and publication created in a previous step.
Don't enable the subscriptions yet). Sets the replication progress to
the consistent LSN that was got in a previous step. Enable the
subscription for each specified database on the target server. Remove
the additional replication slot that was used to get the consistent LSN.
Stop the target server. Change the system identifier from the target
server.

Depending on your workload and database size, creating a logical replica
couldn't be an option due to resource constraints (WAL backlog should be
available until all table data is synchronized). The initial data copy
and the replication progress tends to be faster on a physical replica.
The purpose of this tool is to speed up a logical replica setup.
---
 doc/src/sgml/ref/allfiles.sgml                |    1 +
 doc/src/sgml/ref/pg_subscriber.sgml           |  284 +++
 doc/src/sgml/reference.sgml                   |    1 +
 src/bin/pg_basebackup/Makefile                |    8 +-
 src/bin/pg_basebackup/meson.build             |   19 +
 src/bin/pg_basebackup/pg_subscriber.c         | 1517 +++++++++++++++++
 src/bin/pg_basebackup/t/040_pg_subscriber.pl  |   44 +
 .../t/041_pg_subscriber_standby.pl            |  139 ++
 src/tools/msvc/Mkvcbuild.pm                   |    2 +-
 9 files changed, 2013 insertions(+), 2 deletions(-)
 create mode 100644 doc/src/sgml/ref/pg_subscriber.sgml
 create mode 100644 src/bin/pg_basebackup/pg_subscriber.c
 create mode 100644 src/bin/pg_basebackup/t/040_pg_subscriber.pl
 create mode 100644 src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..e2ecb4f944 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -213,6 +213,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgResetwal         SYSTEM "pg_resetwal.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
+<!ENTITY pgSubscriber       SYSTEM "pg_subscriber.sgml">
 <!ENTITY pgVerifyBackup     SYSTEM "pg_verifybackup.sgml">
 <!ENTITY pgtestfsync        SYSTEM "pgtestfsync.sgml">
 <!ENTITY pgtesttiming       SYSTEM "pgtesttiming.sgml">
diff --git a/doc/src/sgml/ref/pg_subscriber.sgml b/doc/src/sgml/ref/pg_subscriber.sgml
new file mode 100644
index 0000000000..553185c35f
--- /dev/null
+++ b/doc/src/sgml/ref/pg_subscriber.sgml
@@ -0,0 +1,284 @@
+<!--
+doc/src/sgml/ref/pg_subscriber.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgsubscriber">
+ <indexterm zone="app-pgsubscriber">
+  <primary>pg_subscriber</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_subscriber</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_subscriber</refname>
+  <refpurpose>create a new logical replica from a standby server</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_subscriber</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+  <para>
+   <application>pg_subscriber</application> takes the publisher and subscriber
+   connection strings, a cluster directory from a standby server and a list of
+   database names and it sets up a new logical replica using the physical
+   recovery process.
+  </para>
+
+  <para>
+   The <application>pg_subscriber</application> should be run at the target
+   server. The source server (known as publisher server) should accept logical
+   replication connections from the target server (known as subscriber server).
+   The target server should accept local logical replication connection.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_subscriber</application> accepts the following
+    command-line arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        The target directory that contains a cluster directory from a standby
+        server.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-P  <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--publisher-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the publisher. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-S <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--subscriber-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the subscriber. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
+      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
+      <listitem>
+       <para>
+        The database name to create the subscription. Multiple databases can be
+        selected by writing multiple <option>-d</option> switches.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-n</option></term>
+      <term><option>--dry-run</option></term>
+      <listitem>
+       <para>
+        Do everything except actually modifying the target directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode. This will cause
+        <application>pg_subscriber</application> to output progress messages
+        and detailed information about each step.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other options are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_subscriber</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</option></term>
+       <term><option>--help</option></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_subscriber</application> command
+       line arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   The transformation proceeds in the following steps:
+  </para>
+
+  <procedure>
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the given target data
+     directory has the same system identifier than the source data directory.
+     Since it uses the recovery process as one of the steps, it starts the
+     target server as a replica from the source server. If the system
+     identifier is not the same, <application>pg_subscriber</application> will
+     terminate with an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the target data
+     directory is used by a standby server. Stop the standby server if it is
+     running. One of the next steps is to add some recovery parameters that
+     requires a server start. This step avoids an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one replication slot for
+     each specified database on the source server. The replication slot name
+     contains a <literal>pg_subscriber</literal> prefix. These replication
+     slots will be used by the subscriptions in a future step.  Another
+     replication slot is used to get a consistent start location. This
+     consistent LSN will be used as a stopping point in the <xref
+     linkend="guc-recovery-target-lsn"/> parameter and by the
+     subscriptions as a replication starting point. It guarantees that no
+     transaction will be lost.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> writes recovery parameters into
+     the target data directory and start the target server. It specifies a LSN
+     (consistent LSN that was obtained in the previous step) of write-ahead
+     log location up to which recovery will proceed. It also specifies
+     <literal>promote</literal> as the action that the server should take once
+     the recovery target is reached. This step finishes once the server ends
+     standby mode and is accepting read-write operations.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Next, <application>pg_subscriber</application> creates one publication
+     for each specified database on the source server. Each publication
+     replicates changes for all tables in the database. The publication name
+     contains a <literal>pg_subscriber</literal> prefix. These publication
+     will be used by a corresponding subscription in a next step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one subscription for
+     each specified database on the target server. Each subscription name
+     contains a <literal>pg_subscriber</literal> prefix. The replication slot
+     name is identical to the subscription name. It does not copy existing data
+     from the source server. It does not create a replication slot. Instead, it
+     uses the replication slot that was created in a previous step. The
+     subscription is created but it is not enabled yet. The reason is the
+     replication progress must be set to the consistent LSN but replication
+     origin name contains the subscription oid in its name. Hence, the
+     subscription will be enabled in a separate step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> sets the replication progress to
+     the consistent LSN that was obtained in a previous step. When the target
+     server started the recovery process, it caught up to the consistent LSN.
+     This is the exact LSN to be used as a initial location for each
+     subscription.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Finally, <application>pg_subscriber</application> enables the subscription
+     for each specified database on the target server. The subscription starts
+     streaming from the consistent LSN.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> removes the additional replication
+     slot that was used to get the consistent LSN on the source server.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> stops the target server to change
+     its system identifier.
+    </para>
+   </step>
+  </procedure>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To create a logical replica for databases <literal>hr</literal> and
+   <literal>finance</literal> from a standby server at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_subscriber -D /usr/local/pgsql/data -P "host=foo" -S "host=localhost" -d hr -d finance</userinput>
+</screen>
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="app-pgbasebackup"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..67e257436b 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -257,6 +257,7 @@
    &pgReceivewal;
    &pgRecvlogical;
    &pgRestore;
+   &pgSubscriber;
    &pgVerifyBackup;
    &psqlRef;
    &reindexdb;
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 74dc1ddd6d..bc011565bb 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -47,7 +47,7 @@ BBOBJS = \
 	bbstreamer_tar.o \
 	bbstreamer_zstd.o
 
-all: pg_basebackup pg_receivewal pg_recvlogical
+all: pg_basebackup pg_receivewal pg_recvlogical pg_subscriber
 
 pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -58,10 +58,14 @@ pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake
 pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_subscriber: $(WIN32RES) pg_subscriber.o | submake-libpq submake-libpgport submake-libpgfeutils
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	$(INSTALL_PROGRAM) pg_receivewal$(X) '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
+	$(INSTALL_PROGRAM) pg_subscriber$(X) '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
@@ -70,10 +74,12 @@ uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
 
 clean distclean:
 	rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
 		$(BBOBJS) pg_receivewal.o pg_recvlogical.o \
+		pg_subscriber$(X) pg_subscriber.o \
 		$(OBJS)
 	rm -rf tmp_check
 
diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build
index c426173db3..601517c096 100644
--- a/src/bin/pg_basebackup/meson.build
+++ b/src/bin/pg_basebackup/meson.build
@@ -75,6 +75,23 @@ pg_recvlogical = executable('pg_recvlogical',
 )
 bin_targets += pg_recvlogical
 
+pg_subscriber_sources = files(
+  'pg_subscriber.c'
+)
+
+if host_system == 'windows'
+  pg_subscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+	'--NAME', 'pg_subscriber',
+	'--FILEDESC', 'pg_subscriber - create a new logical replica from a standby server',])
+endif
+
+pg_subscriber = executable('pg_subscriber',
+  pg_subscriber_sources,
+  dependencies: [frontend_code, libpq],
+  kwargs: default_bin_args,
+)
+bin_targets += pg_subscriber
+
 tests += {
   'name': 'pg_basebackup',
   'sd': meson.current_source_dir(),
@@ -89,6 +106,8 @@ tests += {
       't/011_in_place_tablespace.pl',
       't/020_pg_receivewal.pl',
       't/030_pg_recvlogical.pl',
+      't/040_pg_subscriber.pl',
+      't/041_pg_subscriber_standby.pl',
     ],
   },
 }
diff --git a/src/bin/pg_basebackup/pg_subscriber.c b/src/bin/pg_basebackup/pg_subscriber.c
new file mode 100644
index 0000000000..b96ce26ed7
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_subscriber.c
@@ -0,0 +1,1517 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_subscriber.c
+ *	  Create a new logical replica from a standby server
+ *
+ * Copyright (C) 2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_subscriber/pg_subscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_control.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_utils.h"
+#include "common/logging.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+#include "utils/pidfile.h"
+
+typedef struct LogicalRepInfo
+{
+	Oid			oid;			/* database OID */
+	char	   *dbname;			/* database name */
+	char	   *pubconninfo;	/* publication connection string for logical
+								 * replication */
+	char	   *subconninfo;	/* subscription connection string for logical
+								 * replication */
+	char	   *pubname;		/* publication name */
+	char	   *subname;		/* subscription name (also replication slot
+								 * name) */
+
+	bool		made_replslot;	/* replication slot was created */
+	bool		made_publication;	/* publication was created */
+	bool		made_subscription;	/* subscription was created */
+} LogicalRepInfo;
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(char *conninfo, char *dbname,
+							   const char *noderole);
+static bool get_exec_path(const char *path);
+static bool check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static LogicalRepInfo *store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo);
+static PGconn *connect_database(const char *conninfo);
+static void disconnect_database(PGconn *conn);
+static uint64 get_sysid_from_conn(const char *conninfo);
+static uint64 get_control_from_datadir(const char *datadir);
+static void modify_sysid(const char *pg_resetwal_path, const char *datadir);
+static bool create_all_logical_replication_slots(LogicalRepInfo *dbinfo);
+static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+											 char *slot_name);
+static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action);
+static void wait_for_end_recovery(const char *conninfo);
+static void create_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn);
+static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+
+#define	USEC_PER_SEC	1000000
+#define	WAIT_INTERVAL	1		/* 1 second */
+
+/* Options */
+static const char *progname;
+
+static char *subscriber_dir = NULL;
+static char *pub_conninfo_str = NULL;
+static char *sub_conninfo_str = NULL;
+static SimpleStringList database_names = {NULL, NULL};
+static bool dry_run = false;
+
+static bool success = false;
+
+static char *pg_ctl_path = NULL;
+static char *pg_resetwal_path = NULL;
+
+static LogicalRepInfo *dbinfo;
+static int	num_dbs = 0;
+
+static char temp_replslot[NAMEDATALEN] = {0};
+static bool made_transient_replslot = false;
+
+enum WaitPMResult
+{
+	POSTMASTER_READY,
+	POSTMASTER_STANDBY,
+	POSTMASTER_STILL_STARTING,
+	POSTMASTER_FAILED
+};
+
+
+/*
+ * Cleanup objects that were created by pg_subscriber if there is an error.
+ *
+ * Replication slots, publications and subscriptions are created. Depending on
+ * the step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ */
+static void
+cleanup_objects_atexit(void)
+{
+	PGconn	   *conn;
+	int			i;
+
+	if (success)
+		return;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		if (dbinfo[i].made_subscription)
+		{
+			conn = connect_database(dbinfo[i].subconninfo);
+			if (conn != NULL)
+			{
+				drop_subscription(conn, &dbinfo[i]);
+				disconnect_database(conn);
+			}
+		}
+
+		if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+		{
+			conn = connect_database(dbinfo[i].pubconninfo);
+			if (conn != NULL)
+			{
+				if (dbinfo[i].made_publication)
+					drop_publication(conn, &dbinfo[i]);
+				if (dbinfo[i].made_replslot)
+					drop_replication_slot(conn, &dbinfo[i], NULL);
+				disconnect_database(conn);
+			}
+		}
+	}
+
+	if (made_transient_replslot)
+	{
+		conn = connect_database(dbinfo[0].pubconninfo);
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		disconnect_database(conn);
+	}
+}
+
+static void
+usage(void)
+{
+	printf(_("%s creates a new logical replica from a standby server.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
+	printf(_(" -P, --publisher-conninfo=CONNINFO   publisher connection string\n"));
+	printf(_(" -S, --subscriber-conninfo=CONNINFO  subscriber connection string\n"));
+	printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
+	printf(_(" -n, --dry-run                       stop before modifying anything\n"));
+	printf(_(" -v, --verbose                       output verbose messages\n"));
+	printf(_(" -V, --version                       output version information, then exit\n"));
+	printf(_(" -?, --help                          show this help, then exit\n"));
+	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name plus a fallback application name.
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection string.
+ * If the second argument (dbname) is not null, returns dbname if the provided
+ * connection string contains it. If option --database is not provided, uses
+ * dbname as the only database to setup the logical replica.
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(char *conninfo, char *dbname, const char *noderole)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	PQconninfoOption *conn_opts = NULL;
+	PQconninfoOption *conn_opt;
+	char	   *errmsg = NULL;
+	char	   *ret;
+	int			i;
+
+	pg_log_info("validating connection string on %s", noderole);
+
+	conn_opts = PQconninfoParse(conninfo, &errmsg);
+	if (conn_opts == NULL)
+	{
+		pg_log_error("could not parse connection string: %s", errmsg);
+		return NULL;
+	}
+
+	i = 0;
+	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+	{
+		if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+		{
+			if (dbname)
+				dbname = pg_strdup(conn_opt->val);
+			continue;
+		}
+
+		if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+		{
+			if (i > 0)
+				appendPQExpBufferChar(buf, ' ');
+			appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+			i++;
+		}
+	}
+
+	if (i > 0)
+		appendPQExpBufferChar(buf, ' ');
+	appendPQExpBuffer(buf, "fallback_application_name=%s", progname);
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+	PQconninfoFree(conn_opts);
+
+	return ret;
+}
+
+/*
+ * Get the absolute path from other PostgreSQL binaries (pg_ctl and
+ * pg_resetwal) that is used by it.
+ */
+static bool
+get_exec_path(const char *path)
+{
+	int			rc;
+
+	pg_ctl_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_ctl",
+						 "pg_ctl (PostgreSQL) " PG_VERSION "\n",
+						 pg_ctl_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_ctl", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_ctl", full_path, progname);
+		return false;
+	}
+
+	pg_log_debug("pg_ctl path is: %s", pg_ctl_path);
+
+	pg_resetwal_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_resetwal",
+						 "pg_resetwal (PostgreSQL) " PG_VERSION "\n",
+						 pg_resetwal_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_resetwal", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_resetwal", full_path, progname);
+		return false;
+	}
+
+	pg_log_debug("pg_resetwal path is: %s", pg_resetwal_path);
+
+	return true;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static bool
+check_data_directory(const char *datadir)
+{
+	struct stat statbuf;
+	char		versionfile[MAXPGPATH];
+
+	pg_log_info("checking if directory \"%s\" is a cluster data directory",
+				datadir);
+
+	if (stat(datadir, &statbuf) != 0)
+	{
+		if (errno == ENOENT)
+			pg_log_error("data directory \"%s\" does not exist", datadir);
+		else
+			pg_log_error("could not access directory \"%s\": %s", datadir, strerror(errno));
+
+		return false;
+	}
+
+	snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+	if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+	{
+		pg_log_error("directory \"%s\" is not a database cluster directory", datadir);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	Assert(conninfo != NULL);
+
+	appendPQExpBufferStr(buf, conninfo);
+	appendPQExpBuffer(buf, " dbname=%s", dbname);
+
+	ret = pg_strdup(buf->data);
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+/*
+ * Store publication and subscription information.
+ */
+static LogicalRepInfo *
+store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo)
+{
+	LogicalRepInfo *dbinfo;
+	SimpleStringListCell *cell;
+	int			i = 0;
+
+	dbinfo = (LogicalRepInfo *) pg_malloc(num_dbs * sizeof(LogicalRepInfo));
+
+	for (cell = database_names.head; cell; cell = cell->next)
+	{
+		char	   *conninfo;
+
+		/* Publisher. */
+		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+		dbinfo[i].pubconninfo = conninfo;
+		dbinfo[i].dbname = cell->val;
+		dbinfo[i].made_replslot = false;
+		dbinfo[i].made_publication = false;
+		dbinfo[i].made_subscription = false;
+		/* other struct fields will be filled later. */
+
+		/* Subscriber. */
+		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+		dbinfo[i].subconninfo = conninfo;
+
+		i++;
+	}
+
+	return dbinfo;
+}
+
+static PGconn *
+connect_database(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	const char *rconninfo;
+
+	/* logical replication mode */
+	rconninfo = psprintf("%s replication=database", conninfo);
+
+	conn = PQconnectdb(rconninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_log_error("connection to database failed: %s", PQerrorMessage(conn));
+		return NULL;
+	}
+
+	/* secure search_path */
+	res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not clear search_path: %s", PQresultErrorMessage(res));
+		return NULL;
+	}
+	PQclear(res);
+
+	return conn;
+}
+
+static void
+disconnect_database(PGconn *conn)
+{
+	Assert(conn != NULL);
+
+	PQfinish(conn);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static uint64
+get_sysid_from_conn(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from publisher");
+
+	conn = connect_database(conninfo);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "IDENTIFY_SYSTEM", PQresultErrorMessage(res));
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+
+	sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
+
+	pg_log_info("system identifier is %ld on publisher", sysid);
+
+	disconnect_database(conn);
+
+	return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a replication connection.
+ */
+static uint64
+get_control_from_datadir(const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	sysid = cf->system_identifier;
+
+	pg_log_info("system identifier is %ld on subscriber", sysid);
+
+	pfree(cf);
+
+	return sysid;
+}
+
+/*
+ * Modify the system identifier. Since a standby server preserves the system
+ * identifier, it makes sense to change it to avoid situations in which WAL
+ * files from one of the systems might be used in the other one.
+ */
+static void
+modify_sysid(const char *pg_resetwal_path, const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	struct timeval tv;
+
+	char	   *cmd_str;
+	int			rc;
+
+	pg_log_info("modifying system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	/*
+	 * Select a new system identifier.
+	 *
+	 * XXX this code was extracted from BootStrapXLOG().
+	 */
+	gettimeofday(&tv, NULL);
+	cf->system_identifier = ((uint64) tv.tv_sec) << 32;
+	cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
+	cf->system_identifier |= getpid() & 0xFFF;
+
+	if (!dry_run)
+		update_controlfile(datadir, cf, true);
+
+	pg_log_info("system identifier is %ld on subscriber", cf->system_identifier);
+
+	pg_log_info("running pg_resetwal on the subscriber");
+
+	cmd_str = psprintf("\"%s\" -D \"%s\"", pg_resetwal_path, datadir);
+
+	pg_log_debug("command is: %s", cmd_str);
+
+	if (!dry_run)
+	{
+		rc = system(cmd_str);
+		if (rc == 0)
+			pg_log_info("subscriber successfully changed the system identifier");
+		else
+			pg_log_error("subscriber failed to change system identifier: exit code: %d", rc);
+	}
+
+	pfree(cf);
+}
+
+static bool
+create_all_logical_replication_slots(LogicalRepInfo *dbinfo)
+{
+	int			i;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		char		replslotname[NAMEDATALEN];
+
+		conn = connect_database(dbinfo[i].pubconninfo);
+		if (conn == NULL)
+			exit(1);
+
+		res = PQexec(conn,
+					 "SELECT oid FROM pg_catalog.pg_database WHERE datname = current_database()");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain database OID: %s", PQresultErrorMessage(res));
+			return false;
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+						 PQntuples(res), 1);
+			return false;
+		}
+
+		/* Remember database OID. */
+		dbinfo[i].oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+		PQclear(res);
+
+		/*
+		 * Build the replication slot name. The name must not exceed
+		 * NAMEDATALEN - 1. This current schema uses a maximum of 36
+		 * characters (14 + 10 + 1 + 10 + '\0'). System identifier is included
+		 * to reduce the probability of collision. By default, subscription
+		 * name is used as replication slot name.
+		 */
+		snprintf(replslotname, sizeof(replslotname),
+				 "pg_subscriber_%u_%d",
+				 dbinfo[i].oid,
+				 (int) getpid());
+		dbinfo[i].subname = pg_strdup(replslotname);
+
+		/* Create replication slot on publisher. */
+		if (create_logical_replication_slot(conn, &dbinfo[i], replslotname) != NULL || dry_run)
+			pg_log_info("create replication slot \"%s\" on publisher", replslotname);
+		else
+			return false;
+
+		disconnect_database(conn);
+	}
+
+	return true;
+}
+
+/*
+ * Create a logical replication slot and returns a consistent LSN. The returned
+ * LSN might be used to catch up the subscriber up to the required point.
+ *
+ * CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the consistent LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+								char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	char	   *lsn = NULL;
+	bool		transient_replslot = false;
+
+	Assert(conn != NULL);
+
+	/*
+	 * If no slot name is informed, it is a transient replication slot used
+	 * only for catch up purposes.
+	 */
+	if (slot_name[0] == '\0')
+	{
+		snprintf(slot_name, NAMEDATALEN, "pg_subscriber_%d_startpoint",
+				 (int) getpid());
+		transient_replslot = true;
+	}
+
+	pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+	appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+						 PQresultErrorMessage(res));
+			return lsn;
+		}
+	}
+
+	/* for cleanup purposes */
+	if (transient_replslot)
+		made_transient_replslot = true;
+	else
+		dbinfo->made_replslot = true;
+
+	if (!dry_run)
+	{
+		lsn = pg_strdup(PQgetvalue(res, 0, 1));
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+
+	return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+						 PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc, int action)
+{
+	if (rc != 0)
+	{
+		if (WIFEXITED(rc))
+		{
+			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+		}
+		else if (WIFSIGNALED(rc))
+		{
+#if defined(WIN32)
+			pg_log_error("pg_ctl was terminated by exception 0x%X", WTERMSIG(rc));
+			pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+#else
+			pg_log_error("pg_ctl was terminated by signal %d: %s",
+						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+		}
+		else
+		{
+			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+		}
+
+		pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+		exit(1);
+	}
+
+	if (action)
+		pg_log_info("postmaster was started");
+	else
+		pg_log_info("postmaster was stopped");
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ */
+static void
+wait_for_end_recovery(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	int			status = POSTMASTER_STILL_STARTING;
+
+	pg_log_info("waiting the postmaster to reach the consistent state");
+
+	conn = connect_database(conninfo);
+	if (conn == NULL)
+		exit(1);
+
+	for (;;)
+	{
+		bool		in_recovery;
+
+		res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain recovery progress");
+			exit(1);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("unexpected result from pg_is_in_recovery function");
+			exit(1);
+		}
+
+		in_recovery = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+
+		PQclear(res);
+
+		/*
+		 * Does the recovery process finish? In dry run mode, there is no
+		 * recovery mode. Bail out as the recovery process has ended.
+		 */
+		if (!in_recovery || dry_run)
+		{
+			status = POSTMASTER_READY;
+			break;
+		}
+
+		/* Keep waiting. */
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+	}
+
+	disconnect_database(conn);
+
+	if (status == POSTMASTER_STILL_STARTING)
+	{
+		pg_log_error("server did not end recovery");
+		exit(1);
+	}
+
+	pg_log_info("postmaster reached the consistent state");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/* Check if the publication needs to be created. */
+	appendPQExpBuffer(str,
+					  "SELECT puballtables FROM pg_catalog.pg_publication WHERE pubname = '%s'",
+					  dbinfo->pubname);
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) == 1)
+	{
+		/*
+		 * If publication name already exists and puballtables is true, let's
+		 * use it. A previous run of pg_subscriber must have created this
+		 * publication. Bail out.
+		 */
+		if (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
+		{
+			pg_log_info("publication \"%s\" already exists", dbinfo->pubname);
+			return;
+		}
+		else
+		{
+			/*
+			 * Unfortunately, if it reaches this code path, it will always
+			 * fail (unless you decide to change the existing publication
+			 * name). That's bad but it is very unlikely that the user will
+			 * choose a name with pg_subscriber_ prefix followed by the exact
+			 * database oid in which puballtables is false.
+			 */
+			pg_log_error("publication \"%s\" does not replicate changes for all tables",
+						 dbinfo->pubname);
+			pg_log_error_hint("Consider renaming this publication.");
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	PQclear(res);
+	resetPQExpBuffer(str);
+
+	pg_log_info("creating publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_publication = true;
+
+	if (!dry_run)
+		PQclear(res);
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it. By
+ * default, the subscription name is used as replication slot name. It is
+ * not required to copy data. The subscription will be created but it will not
+ * be enabled now. That's because the replication progress must be set and the
+ * replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("creating subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s "
+					  "WITH (create_slot = false, copy_data = false, enabled = false)",
+					  dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+						 dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_subscription = true;
+
+	if (!dry_run)
+		PQclear(res);
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove subscription if it couldn't finish all steps.
+ */
+static void
+drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", dbinfo->subname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the temporary
+ * replication slot. The goal is to set up the initial location for the logical
+ * replication that is the exact LSN that the subscriber was promoted. Once the
+ * subscription is enabled it will start streaming from that location onwards.
+ * In dry run mode, the subscription OID and LSN are set to invalid values for
+ * printing purposes.
+ */
+static void
+set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	Oid			suboid;
+	char		originname[NAMEDATALEN];
+	char		lsnstr[17 + 1]; /* MAXPG_LSNLEN = 17 */
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(str,
+					  "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", dbinfo->subname);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscription OID: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) != 1 && !dry_run)
+	{
+		pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (dry_run)
+	{
+		suboid = InvalidOid;
+		snprintf(lsnstr, sizeof(lsnstr), "%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+		snprintf(lsnstr, sizeof(lsnstr), "%s", lsn);
+	}
+
+	/*
+	 * The origin name is defined as pg_%u. %u is the subscription OID. See
+	 * ApplyWorkerMain().
+	 */
+	snprintf(originname, sizeof(originname), "pg_%u", suboid);
+
+	PQclear(res);
+
+	pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+				originname, lsnstr, dbinfo->dbname);
+
+	resetPQExpBuffer(str);
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", originname, lsnstr);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+						 dbinfo->subname, PQresultErrorMessage(res));
+			PQfinish(conn);
+			exit(1);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial location, enabling the subscription is the last step
+ * of this setup.
+ */
+static void
+enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("enabling subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not enable subscription \"%s\": %s", dbinfo->subname,
+						 PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] =
+	{
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"pgdata", required_argument, NULL, 'D'},
+		{"publisher-conninfo", required_argument, NULL, 'P'},
+		{"subscriber-conninfo", required_argument, NULL, 'S'},
+		{"database", required_argument, NULL, 'd'},
+		{"dry-run", no_argument, NULL, 'n'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+
+	int			c;
+	int			option_index;
+	int			rc;
+
+	char	   *pg_ctl_cmd;
+
+	char	   *pub_base_conninfo = NULL;
+	char	   *sub_base_conninfo = NULL;
+	char	   *dbname_conninfo = NULL;
+
+	uint64		pub_sysid;
+	uint64		sub_sysid;
+	struct stat statbuf;
+
+	PGconn	   *conn;
+	char	   *consistent_lsn;
+
+	PQExpBuffer recoveryconfcontents = NULL;
+
+	char		pidfile[MAXPGPATH];
+
+	int			i;
+
+	pg_logging_init(argv[0]);
+	pg_logging_set_level(PG_LOG_WARNING);
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_subscriber"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_subscriber (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	atexit(cleanup_objects_atexit);
+
+	/*
+	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
+	 * it either.
+	 */
+#ifndef WIN32
+	if (geteuid() == 0)
+	{
+		pg_log_error("cannot be executed by \"root\"");
+		pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
+						  progname);
+		exit(1);
+	}
+#endif
+
+	while ((c = getopt_long(argc, argv, "D:P:S:d:t:v",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				subscriber_dir = pg_strdup(optarg);
+				break;
+			case 'P':
+				pub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'S':
+				sub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'd':
+				simple_string_list_append(&database_names, optarg);
+				num_dbs++;
+				break;
+			case 'n':
+				dry_run = true;
+				break;
+			case 'v':
+				pg_logging_increase_verbosity();
+				break;
+			default:
+				/* getopt_long already emitted a complaint */
+				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (subscriber_dir == NULL)
+	{
+		pg_log_error("no subscriber data directory specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Parse connection string. Build a base connection string that might be
+	 * reused by multiple databases.
+	 */
+	if (pub_conninfo_str == NULL)
+	{
+		/*
+		 * TODO use primary_conninfo (if available) from subscriber and
+		 * extract publisher connection string. Assume that there are
+		 * identical entries for physical and logical replication. If there is
+		 * not, we would fail anyway.
+		 */
+		pg_log_error("no publisher connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	pub_base_conninfo = get_base_conninfo(pub_conninfo_str, dbname_conninfo,
+										  "publisher");
+	if (pub_base_conninfo == NULL)
+		exit(1);
+
+	if (sub_conninfo_str == NULL)
+	{
+		pg_log_error("no subscriber connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	sub_base_conninfo = get_base_conninfo(sub_conninfo_str, NULL, "subscriber");
+	if (sub_base_conninfo == NULL)
+		exit(1);
+
+	if (database_names.head == NULL)
+	{
+		pg_log_info("no database was specified");
+
+		/*
+		 * If --database option is not provided, try to obtain the dbname from
+		 * the publisher conninfo. If dbname parameter is not available, error
+		 * out.
+		 */
+		if (dbname_conninfo)
+		{
+			simple_string_list_append(&database_names, dbname_conninfo);
+			num_dbs++;
+
+			pg_log_info("database \"%s\" was extracted from the publisher connection string",
+						dbname_conninfo);
+		}
+		else
+		{
+			pg_log_error("no database name specified");
+			pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+			exit(1);
+		}
+	}
+
+	/*
+	 * Get the absolute path of pg_ctl and pg_resetwal on the subscriber.
+	 */
+	if (!get_exec_path(argv[0]))
+		exit(1);
+
+	/* rudimentary check for a data directory. */
+	if (!check_data_directory(subscriber_dir))
+		exit(1);
+
+	/* Store database information for publisher and subscriber. */
+	dbinfo = store_pub_sub_info(pub_base_conninfo, sub_base_conninfo);
+
+	/*
+	 * Check if the subscriber data directory has the same system identifier
+	 * than the publisher data directory.
+	 */
+	pub_sysid = get_sysid_from_conn(dbinfo[0].pubconninfo);
+	sub_sysid = get_control_from_datadir(subscriber_dir);
+	if (pub_sysid != sub_sysid)
+	{
+		pg_log_error("subscriber data directory is not a copy of the source database cluster");
+		exit(1);
+	}
+
+	/* subscriber PID file. */
+	snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+	/*
+	 * Stop the subscriber if it is a standby server. Before executing the
+	 * transformation steps, make sure the subscriber is not running because
+	 * one of the steps is to modify some recovery parameters that require a
+	 * restart.
+	 */
+	if (stat(pidfile, &statbuf) == 0)
+	{
+		pg_log_info("subscriber is up and running");
+		pg_log_info("stopping the server to start the transformation steps");
+
+		pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+		rc = system(pg_ctl_cmd);
+		pg_ctl_status(pg_ctl_cmd, rc, 0);
+	}
+
+	/*
+	 * Create a replication slot for each database on the publisher.
+	 */
+	if (!create_all_logical_replication_slots(dbinfo))
+		exit(1);
+
+	/*
+	 * Create a logical replication slot to get a consistent LSN.
+	 *
+	 * This consistent LSN will be used later to advanced the recently created
+	 * replication slots. We cannot use the last created replication slot
+	 * because the consistent LSN should be obtained *after* the base backup
+	 * finishes (and the base backup should include the logical replication
+	 * slots).
+	 *
+	 * XXX we should probably use the last created replication slot to get a
+	 * consistent LSN but it should be changed after adding pg_basebackup
+	 * support.
+	 *
+	 * A temporary replication slot is not used here to avoid keeping a
+	 * replication connection open (depending when base backup was taken, the
+	 * connection should be open for a few hours).
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo);
+	if (conn == NULL)
+		exit(1);
+	consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0],
+													 temp_replslot);
+
+	/*
+	 * Write recovery parameters.
+	 *
+	 * Despite of the recovery parameters will be written to the subscriber,
+	 * use a publisher connection for the follwing recovery functions. The
+	 * connection is only used to check the current server version (physical
+	 * replica, same server version). The subscriber is not running yet. In
+	 * dry run mode, the recovery parameters *won't* be written. An invalid
+	 * LSN is used for printing purposes.
+	 */
+	recoveryconfcontents = GenerateRecoveryConfig(conn, NULL);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
+
+	if (dry_run)
+	{
+		appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
+		appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%X/%X'\n",
+						  LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+						  consistent_lsn);
+		WriteRecoveryConfig(conn, subscriber_dir, recoveryconfcontents);
+	}
+	disconnect_database(conn);
+
+	pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+
+	/*
+	 * Start subscriber and wait until accepting connections.
+	 */
+	pg_log_info("starting the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 1);
+
+	/*
+	 * Waiting the subscriber to be promoted.
+	 */
+	wait_for_end_recovery(dbinfo[0].subconninfo);
+
+	/*
+	 * Create a publication for each database. This step should be executed
+	 * after promoting the subscriber to avoid replicating unnecessary
+	 * objects.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		char		pubname[NAMEDATALEN];
+
+		/* Connect to publisher. */
+		conn = connect_database(dbinfo[i].pubconninfo);
+		if (conn == NULL)
+			exit(1);
+
+		/*
+		 * Build the publication name. The name must not exceed NAMEDATALEN -
+		 * 1. This current schema uses a maximum of 35 characters (14 + 10 +
+		 * '\0').
+		 */
+		snprintf(pubname, sizeof(pubname), "pg_subscriber_%u", dbinfo[i].oid);
+		dbinfo[i].pubname = pg_strdup(pubname);
+
+		create_publication(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Create a subscription for each database.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		/* Connect to subscriber. */
+		conn = connect_database(dbinfo[i].subconninfo);
+		if (conn == NULL)
+			exit(1);
+
+		create_subscription(conn, &dbinfo[i]);
+
+		/* Set the replication progress to the correct LSN. */
+		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+		/* Enable subscription. */
+		enable_subscription(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * The transient replication slot is no longer required. Drop it.
+	 *
+	 * XXX we might not fail here. Instead, we provide a warning so the user
+	 * eventually drops the replication slot later.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo);
+	if (conn == NULL)
+	{
+		pg_log_warning("could not drop transient replication slot \"%s\" on publisher", temp_replslot);
+		pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+	}
+	else
+	{
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Stop the subscriber.
+	 */
+	pg_log_info("stopping the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 0);
+
+	/*
+	 * Change system identifier.
+	 */
+	modify_sysid(pg_resetwal_path, subscriber_dir);
+
+	success = true;
+
+	pg_log_info("Done!");
+
+	return 0;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_subscriber.pl b/src/bin/pg_basebackup/t/040_pg_subscriber.pl
new file mode 100644
index 0000000000..9d20847dc2
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_subscriber.pl
@@ -0,0 +1,44 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+#
+# Test checking options of pg_subscriber.
+#
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_subscriber');
+program_version_ok('pg_subscriber');
+program_options_handling_ok('pg_subscriber');
+
+my $datadir = PostgreSQL::Test::Utils::tempdir;
+
+command_fails(['pg_subscriber'],
+	'no subscriber data directory specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--pgdata', $datadir
+	],
+	'no publisher connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--dry-run',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres'
+	],
+	'no subscriber connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--verbose',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres',
+		'--subscriber-conninfo', 'dbname=postgres'
+	],
+	'no database name specified');
+
+done_testing();
diff --git a/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl
new file mode 100644
index 0000000000..ce25608c68
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+#
+# Test using a standby server as the subscriber.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_p;
+my $node_f;
+my $node_s;
+my $result;
+
+# Set up node P as primary
+$node_p = PostgreSQL::Test::Cluster->new('node_p');
+$node_p->init(allows_streaming => 'logical');
+$node_p->start;
+
+# Set up node F as about-to-fail node
+# The extra option forces it to initialize a new cluster instead of copying a
+# previously initdb's cluster.
+$node_f = PostgreSQL::Test::Cluster->new('node_f');
+$node_f->init(allows_streaming => 'logical', extra => [ '--no-instructions' ]);
+$node_f->start;
+
+# On node P
+# - create databases
+# - create test tables
+# - insert a row
+$node_p->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_p->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_p->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+
+# Set up node S as standby linking to node P
+$node_p->backup('backup_1');
+$node_s = PostgreSQL::Test::Cluster->new('node_s');
+$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s->append_conf('postgresql.conf', 'log_min_messages = debug2');
+$node_s->set_standby_mode();
+$node_s->start;
+
+# Insert another row on node P and wait node S to catch up
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')");
+$node_p->wait_for_replay_catchup($node_s);
+
+# Run pg_subscriber on about-to-fail node F
+command_fails(
+	[
+		'pg_subscriber', '--verbose',
+		'--pgdata', $node_f->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_f->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'subscriber data directory is not a copy of the source database cluster');
+
+# dry run mode on node S
+command_ok(
+	[
+		'pg_subscriber', '--verbose', '--dry-run',
+		'--pgdata', $node_s->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_s->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_subscriber --dry-run on node S');
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+# Check if node S is still a standby
+is($node_s->safe_psql('postgres', 'SELECT pg_is_in_recovery()'),
+	't', 'standby is in recovery');
+
+# Run pg_subscriber on node S
+command_ok(
+	[
+		'pg_subscriber', '--verbose',
+		'--pgdata', $node_s->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_s->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_subscriber on node S');
+
+# Insert rows on P
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')");
+$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')");
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+
+# Get subscription names
+$result = $node_s->safe_psql(
+	'postgres', qq(
+	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_subscriber_'
+));
+my @subnames = split("\n", $result);
+
+# Wait subscriber to catch up
+$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+
+# Check result on database pg1
+$result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row),
+	'logical replication works on database pg1');
+
+# Check result on database pg2
+$result = $node_s->safe_psql('pg2', 'SELECT * FROM tbl2');
+is( $result, qq(row 1),
+	'logical replication works on database pg2');
+
+# Different system identifier?
+my $sysid_p = $node_p->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()');
+my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()');
+ok($sysid_p != $sysid_s, 'system identifier was changed');
+
+# clean up
+$node_p->teardown_node;
+$node_s->teardown_node;
+
+done_testing();
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 46df01cc8d..68af923a29 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -55,7 +55,7 @@ my @contrib_excludes = (
 # Set of variables for frontend modules
 my $frontend_defines = { 'pgbench' => 'FD_SETSIZE=1024' };
 my @frontend_uselibpq =
-  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb');
+  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb', 'pg_subscriber');
 my @frontend_uselibpgport = (
 	'pg_amcheck', 'pg_archivecleanup',
 	'pg_test_fsync', 'pg_test_timing',
-- 
2.30.2

Reply via email to