On Mon, Feb 21, 2022, at 9:09 AM, Euler Taveira wrote:
> A new tool called pg_subscriber does this conversion and is tightly integrated
> with Postgres.

After a long period of inactivity, I'm back to this client tool. As suggested
by Andres, I added a new helper function to change the system identifier as the
last step. I also thought about including the pg_basebackup support but decided
to keep it simple (at least for this current version). The user can always
execute pg_basebackup as a preliminary step to create a standby replica and it
will work. (I will post a separate patch that includes the pg_basebackup
support on the top of this one.)

Amit asked if an extra replication slot is required. It is not. The reason I
keep it is to remember that at least the latest replication slot needs to be
created after the pg_basebackup finishes (pg_backup_stop() call). Regarding the
atexit() routine, it tries to do the best to remove all the objects it created,
however, there is no guarantee it can remove them because it depends on
external resources such as connectivity and authorization. I added a new
warning message if it cannot drop the transient replication slot. It is
probably a good idea to add such warning message into the cleanup routine too.
More to this point, another feature that checks and remove all left objects.
The transient replication slot is ok because it should always be removed at the
end. However, the big question is how to detect that you are not removing
objects (publications, subscriptions, replication slots) from a successful
conversion.

Amit also asked about setup a logical replica with m databases where m is less
than the total number of databases. One option is to remove the "extra"
databases in the target server after promoting the physical replica or in one
of the latest steps. Maybe it is time to propose partial physical replica that
contains only a subset of databases on primary. (I'm not volunteering to it.)
Hence, pg_basebackup has an option to remove these "extra" databases so this
tool can take advantage of it.

Let's continue with the bike shedding... I agree with Peter E that this name
does not express what this tool is. At the moment, it only have one action:
create. If I have to suggest other actions I would say that it could support
switchover option too (that removes the infrastructure created by this tool).
If we decide to keep this name, it should be a good idea to add an option to
indicate what action it is executing (similar to pg_recvlogical) as suggested
by Peter.

I included the documentation cleanups that Peter E shared. I also did small
adjustments into the documentation. It probably deserves a warning section that
advertises about the cleanup.

I refactored the transient replication slot code and decided to use a permanent
(instead of temporary) slot to avoid keeping a replication connection open for
a long time until the target server catches up.

The system identifier functions (get_control_from_datadir() and
get_sysid_from_conn()) now returns uint64 as suggested by Peter.

After reflection, the --verbose option should be renamed to --progress. There
are also some messages that should be converted to debug messages.

I fixed the useless malloc. I rearrange the code a bit but the main still has ~
370 lines (without options/validation ~ 255 lines. I'm trying to rearrange the
code to make the code easier to read and at the same time reduce the main size.
I already have a few candidates in mind such as the code that stops the standby
and the part that includes the recovery parameters. I removed the refactor I
proposed in the previous patch and the current code is relying on pg_ctl --wait
behavior. Are there issues with this choice? Well, one annoying situation is
that pg_ctl does not have a "wait forever" option. If one of the pg_ctl calls
fails, you could probably have to start again (unless you understand the
pg_subscriber internals and fix the setup by yourself). You have to choose an
arbitrary timeout value and expect that pg_ctl *does* perform the action less
than the timeout.

Real tests are included. The cleanup code does not have coverage because a
simple reproducible case isn't easy. I'm also not sure if it is worth it. We
can explain it in the warning section that was proposed.

It is still a WIP but I would like to share it and get some feedback.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 0aca46ed57c15bce1972c9db6459c04bcc5cf73d Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Mon, 5 Jun 2023 14:39:40 -0400
Subject: [PATCH v2] Creates a new logical replica from a standby server

A new tool called pg_subscriber can convert a physical replica into a
logical replica. It runs on the target server and should be able to
connect to the source server (publisher) and the target server
(subscriber).

The conversion requires a few steps. Check if the target data directory
has the same system identifier than the source data directory. Stop the
target server if it is running as a standby server. Create one
replication slot per specified database on the source server. One
additional replication slot is created at the end to get the consistent
LSN (This consistent LSN will be used as (a) a stopping point for the
recovery process and (b) a starting point for the subscriptions). Write
recovery parameters into the target data directory and start the target
server (Wait until the target server is promoted). Create one
publication (FOR ALL TABLES) per specified database on the source
server. Create one subscription per specified database on the target
server (Use replication slot and publication created in a previous step.
Don't enable the subscriptions yet). Sets the replication progress to
the consistent LSN that was got in a previous step. Enable the
subscription for each specified database on the target server. Remove
the additional replication slot that was used to get the consistent LSN.
Stop the target server. Change the system identifier from the target
server.

Depending on your workload and database size, creating a logical replica
couldn't be an option due to resource constraints (WAL backlog should be
available until all table data is synchronized). The initial data copy
and the replication progress tends to be faster on a physical replica.
The purpose of this tool is to speed up a logical replica setup.
---
 doc/src/sgml/ref/allfiles.sgml         |    1 +
 doc/src/sgml/ref/pg_subscriber.sgml    |  271 +++++
 doc/src/sgml/reference.sgml            |    1 +
 src/bin/Makefile                       |    1 +
 src/bin/meson.build                    |    1 +
 src/bin/pg_basebackup/streamutil.c     |   55 +
 src/bin/pg_basebackup/streamutil.h     |    5 +
 src/bin/pg_subscriber/Makefile         |   39 +
 src/bin/pg_subscriber/meson.build      |   31 +
 src/bin/pg_subscriber/pg_subscriber.c  | 1470 ++++++++++++++++++++++++
 src/bin/pg_subscriber/po/meson.build   |    3 +
 src/bin/pg_subscriber/t/001_basic.pl   |   42 +
 src/bin/pg_subscriber/t/002_standby.pl |  114 ++
 src/tools/msvc/Mkvcbuild.pm            |    2 +-
 src/tools/pgindent/typedefs.list       |    7 +-
 15 files changed, 2038 insertions(+), 5 deletions(-)
 create mode 100644 doc/src/sgml/ref/pg_subscriber.sgml
 create mode 100644 src/bin/pg_subscriber/Makefile
 create mode 100644 src/bin/pg_subscriber/meson.build
 create mode 100644 src/bin/pg_subscriber/pg_subscriber.c
 create mode 100644 src/bin/pg_subscriber/po/meson.build
 create mode 100644 src/bin/pg_subscriber/t/001_basic.pl
 create mode 100644 src/bin/pg_subscriber/t/002_standby.pl

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..e2ecb4f944 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -213,6 +213,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgResetwal         SYSTEM "pg_resetwal.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
+<!ENTITY pgSubscriber       SYSTEM "pg_subscriber.sgml">
 <!ENTITY pgVerifyBackup     SYSTEM "pg_verifybackup.sgml">
 <!ENTITY pgtestfsync        SYSTEM "pgtestfsync.sgml">
 <!ENTITY pgtesttiming       SYSTEM "pgtesttiming.sgml">
diff --git a/doc/src/sgml/ref/pg_subscriber.sgml b/doc/src/sgml/ref/pg_subscriber.sgml
new file mode 100644
index 0000000000..8480a3a281
--- /dev/null
+++ b/doc/src/sgml/ref/pg_subscriber.sgml
@@ -0,0 +1,271 @@
+<!--
+doc/src/sgml/ref/pg_subscriber.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgsubscriber">
+ <indexterm zone="app-pgsubscriber">
+  <primary>pg_subscriber</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_subscriber</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_subscriber</refname>
+  <refpurpose>create a new logical replica from a standby server</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_subscriber</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+  <para>
+   <application>pg_subscriber</application> takes the publisher and subscriber
+   connection strings, a cluster directory from a standby server and a list of
+   database names and it sets up a new logical replica using the physical
+   recovery process.
+  </para>
+
+  <para>
+   The <application>pg_subscriber</application> should be run at the target
+   server. The source server (known as publisher server) should accept logical
+   replication connections from the target server (known as subscriber server).
+   The target server should accept local logical replication connection.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_subscriber</application> accepts the following
+    command-line arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        The target directory that contains a cluster directory from a standby
+        server.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-P  <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--publisher-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the publisher. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-S <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--subscriber-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the subscriber. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
+      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
+      <listitem>
+       <para>
+        The database name to create the subscription. Multiple databases can be
+        selected by writing multiple <option>-d</option> switches.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode. This will cause
+        <application>pg_subscriber</application> to output progress messages
+        and detailed information about each step.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other options are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_subscriber</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</option></term>
+       <term><option>--help</option></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_subscriber</application> command
+       line arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   The transformation proceeds in the following steps:
+  </para>
+
+  <procedure>
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the given target data
+     directory has the same system identifier than the source data directory.
+     Since it uses the recovery process as one of the steps, it starts the
+     target server as a replica from the source server. If the system
+     identifier is not the same, <application>pg_subscriber</application> will
+     terminate with an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the target data
+     directory is used by a standby server. Stop the standby server if it is
+     running. One of the next steps is to add some recovery parameters that
+     requires a server start. This step avoids an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one replication slot for
+     each specified database on the source server. The replication slot name
+     contains a <literal>pg_subscriber</literal> prefix. These replication
+     slots will be used by the subscriptions in a future step.  Another
+     replication slot is used to get a consistent start location. This
+     consistent LSN will be used as a stopping point in the <xref
+     linkend="guc-recovery-target-lsn"/> parameter and by the
+     subscriptions as a replication starting point. It guarantees that no
+     transaction will be lost.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> writes recovery parameters into
+     the target data directory and start the target server. It specifies a LSN
+     (consistent LSN that was obtained in the previous step) of write-ahead
+     log location up to which recovery will proceed. It also specifies
+     <literal>promote</literal> as the action that the server should take once
+     the recovery target is reached. This step finishes once the server ends
+     standby mode and is accepting read-write operations.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Next, <application>pg_subscriber</application> creates one publication
+     for each specified database on the source server. Each publication
+     replicates changes for all tables in the database. The publication name
+     contains a <literal>pg_subscriber</literal> prefix. These publication
+     will be used by a corresponding subscription in a next step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one subscription for
+     each specified database on the target server. Each subscription name
+     contains a <literal>pg_subscriber</literal> prefix. The replication slot
+     name is identical to the subscription name. It does not copy existing data
+     from the source server. It does not create a replication slot. Instead, it
+     uses the replication slot that was created in a previous step. The
+     subscription is created but it is not enabled yet. The reason is the
+     replication progress must be set to the consistent LSN but replication
+     origin name contains the subscription oid in its name. Hence, the
+     subscription will be enabled in a separate step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> sets the replication progress to
+     the consistent LSN that was obtained in a previous step. When the target
+     server started the recovery process, it caught up to the consistent LSN.
+     This is the exact LSN to be used as a initial location for each
+     subscription.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Finally, <application>pg_subscriber</application> enables the subscription
+     for each specified database on the target server. The subscription starts
+     streaming from the consistent LSN.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> removes the additional replication
+     slot that was used to get the consistent LSN on the source server.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> stops the target server to change
+     its system identifier.
+    </para>
+   </step>
+  </procedure>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To create a logical replica for databases <literal>hr</literal> and
+   <literal>finance</literal> from a standby server at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_subscriber -D /usr/local/pgsql/data -P "host=foo" -S "host=localhost" -d hr -d finance</userinput>
+</screen>
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="app-pgbasebackup"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..67e257436b 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -257,6 +257,7 @@
    &pgReceivewal;
    &pgRecvlogical;
    &pgRestore;
+   &pgSubscriber;
    &pgVerifyBackup;
    &psqlRef;
    &reindexdb;
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 373077bf52..9abd5b9711 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -25,6 +25,7 @@ SUBDIRS = \
 	pg_dump \
 	pg_resetwal \
 	pg_rewind \
+	pg_subscriber \
 	pg_test_fsync \
 	pg_test_timing \
 	pg_upgrade \
diff --git a/src/bin/meson.build b/src/bin/meson.build
index 67cb50630c..c7a6881e5f 100644
--- a/src/bin/meson.build
+++ b/src/bin/meson.build
@@ -11,6 +11,7 @@ subdir('pg_ctl')
 subdir('pg_dump')
 subdir('pg_resetwal')
 subdir('pg_rewind')
+subdir('pg_subscriber')
 subdir('pg_test_fsync')
 subdir('pg_test_timing')
 subdir('pg_upgrade')
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index dbd08ab172..d8e438f114 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -33,6 +33,9 @@
 
 int			WalSegSz;
 
+static bool CreateReplicationSlot_internal(PGconn *conn, const char *slot_name, const char *plugin,
+					  bool is_temporary, bool is_physical, bool reserve_wal,
+					  bool slot_exists_ok, bool two_phase, char *lsn);
 static bool RetrieveDataDirCreatePerm(PGconn *conn);
 
 /* SHOW command for replication connection was introduced in version 10 */
@@ -583,6 +586,26 @@ bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 					  bool is_temporary, bool is_physical, bool reserve_wal,
 					  bool slot_exists_ok, bool two_phase)
+{
+	return CreateReplicationSlot_internal(conn, slot_name, plugin,
+					  is_temporary, is_physical, reserve_wal,
+					  slot_exists_ok, two_phase, NULL);
+}
+
+bool
+CreateReplicationSlotLSN(PGconn *conn, const char *slot_name, const char *plugin,
+					  bool is_temporary, bool is_physical, bool reserve_wal,
+					  bool slot_exists_ok, bool two_phase, char *lsn)
+{
+	return CreateReplicationSlot_internal(conn, slot_name, plugin,
+					  is_temporary, is_physical, reserve_wal,
+					  slot_exists_ok, two_phase, lsn);
+}
+
+static bool
+CreateReplicationSlot_internal(PGconn *conn, const char *slot_name, const char *plugin,
+					  bool is_temporary, bool is_physical, bool reserve_wal,
+					  bool slot_exists_ok, bool two_phase, char *lsn)
 {
 	PQExpBuffer query;
 	PGresult   *res;
@@ -654,6 +677,30 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 		{
 			destroyPQExpBuffer(query);
 			PQclear(res);
+
+			/* Duplicate replication slot. Obtain the current LSN. */
+			if (lsn)
+			{
+				query = createPQExpBuffer();
+				appendPQExpBuffer(query, "SELECT restart_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'", slot_name);
+				res = PQexec(conn, query->data);
+				if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				{
+					pg_log_error("could not read replication slot \"%s\": got %d rows, expected %d rows", slot_name, PQntuples(res), 1);
+					return false;	/* FIXME can't happen */
+				}
+				else if (PQgetisnull(res, 0, 0))
+				{
+					lsn = NULL;
+				}
+				else
+				{
+					lsn = pg_strdup(PQgetvalue(res, 0, 0));
+				}
+				destroyPQExpBuffer(query);
+				PQclear(res);
+			}
+
 			return true;
 		}
 		else
@@ -678,6 +725,14 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 		return false;
 	}
 
+	if (lsn)
+	{
+		if (PQgetisnull(res, 0, 1))
+			lsn = NULL;
+		else
+			lsn = pg_strdup(PQgetvalue(res, 0, 1));
+	}
+
 	destroyPQExpBuffer(query);
 	PQclear(res);
 	return true;
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 268c163213..bbd0789d2b 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -36,6 +36,11 @@ extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
 								  const char *plugin, bool is_temporary,
 								  bool is_physical, bool reserve_wal,
 								  bool slot_exists_ok, bool two_phase);
+extern bool CreateReplicationSlotLSN(PGconn *conn, const char *slot_name,
+								  const char *plugin, bool is_temporary,
+								  bool is_physical, bool reserve_wal,
+								  bool slot_exists_ok, bool two_phase,
+								  char *lsn);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
diff --git a/src/bin/pg_subscriber/Makefile b/src/bin/pg_subscriber/Makefile
new file mode 100644
index 0000000000..b580e57382
--- /dev/null
+++ b/src/bin/pg_subscriber/Makefile
@@ -0,0 +1,39 @@
+# src/bin/pg_subscriber/Makefile
+
+PGFILEDESC = "pg_subscriber - create a new logical replica from a standby server"
+PGAPPICON=win32
+
+subdir = src/bin/pg_subscriber
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
+LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
+
+OBJS = \
+	$(WIN32RES) \
+	pg_subscriber.o
+
+all: pg_subscriber
+
+pg_subscriber: $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+install: all installdirs
+	$(INSTALL_PROGRAM) pg_subscriber$(X) '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
+
+clean distclean maintainer-clean:
+	rm -f pg_subscriber$(X) $(OBJS)
+	rm -rf tmp_check
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
diff --git a/src/bin/pg_subscriber/meson.build b/src/bin/pg_subscriber/meson.build
new file mode 100644
index 0000000000..868c81dc62
--- /dev/null
+++ b/src/bin/pg_subscriber/meson.build
@@ -0,0 +1,31 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+pg_subscriber_sources = files(
+  'pg_subscriber.c'
+)
+
+if host_system == 'windows'
+  pg_subscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+	'--NAME', 'pg_subscriber',
+	'--FILEDESC', 'pg_subscriber - create a new logical replica from a standby server',])
+endif
+
+pg_subscriber = executable('pg_subscriber',
+  pg_subscriber_sources,
+  dependencies: [frontend_code, libpq],
+  kwargs: default_bin_args,
+)
+bin_targets += pg_subscriber
+
+tests += {
+  'name': 'pg_subscriber',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
+
+subdir('po', if_found: libintl)
diff --git a/src/bin/pg_subscriber/pg_subscriber.c b/src/bin/pg_subscriber/pg_subscriber.c
new file mode 100644
index 0000000000..b2ff6d7c15
--- /dev/null
+++ b/src/bin/pg_subscriber/pg_subscriber.c
@@ -0,0 +1,1470 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_subscriber.c
+ *	  Create a new logical replica from a standby server
+ *
+ * Copyright (C) 2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_subscriber/pg_subscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "catalog/pg_control.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_utils.h"
+#include "common/logging.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+#include "utils/pidfile.h"
+
+typedef struct LogicalRepInfo
+{
+	Oid			oid;			/* database OID */
+	char	   *dbname;			/* database name */
+	char	   *pubconninfo;	/* publication connection string for logical
+								 * replication */
+	char	   *subconninfo;	/* subscription connection string for logical
+								 * replication */
+	char	   *pubname;		/* publication name */
+	char	   *subname;		/* subscription name (also replication slot
+								 * name) */
+
+	bool		made_replslot;	/* replication slot was created */
+	bool		made_publication;	/* publication was created */
+	bool		made_subscription;	/* subscription was created */
+} LogicalRepInfo;
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(char *conninfo, char *dbname,
+							   const char *noderole);
+static bool get_exec_path(const char *path);
+static bool check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static LogicalRepInfo *store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo);
+static PGconn *connect_database(const char *conninfo, bool secure_search_path);
+static void disconnect_database(PGconn *conn);
+static uint64 get_sysid_from_conn(const char *conninfo);
+static uint64 get_control_from_datadir(const char *datadir);
+static void modify_sysid(const char *pg_resetwal_path, const char *datadir);
+static bool create_all_logical_replication_slots(LogicalRepInfo *dbinfo);
+static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+											 char *slot_name);
+static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action);
+static void wait_for_end_recovery(const char *conninfo);
+static void create_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn);
+static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+
+#define	USEC_PER_SEC	1000000
+#define	WAIT_INTERVAL	1		/* 1 second */
+
+/* Options */
+static const char *progname;
+
+static char *subscriber_dir = NULL;
+static char *pub_conninfo_str = NULL;
+static char *sub_conninfo_str = NULL;
+static SimpleStringList database_names = {NULL, NULL};
+static int	verbose = 0;
+
+static bool success = false;
+
+static char *pg_ctl_path = NULL;
+static char *pg_resetwal_path = NULL;
+
+static LogicalRepInfo *dbinfo;
+static int	num_dbs = 0;
+
+static char temp_replslot[NAMEDATALEN] = {0};
+static bool made_transient_replslot = false;
+
+enum WaitPMResult
+{
+	POSTMASTER_READY,
+	POSTMASTER_STANDBY,
+	POSTMASTER_STILL_STARTING,
+	POSTMASTER_FAILED
+};
+
+
+/*
+ * Cleanup objects that were created by pg_subscriber if there is an error.
+ *
+ * Replication slots, publications and subscriptions are created. Depending on
+ * the step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ */
+static void
+cleanup_objects_atexit(void)
+{
+	PGconn	   *conn;
+	int			i;
+
+	if (success)
+		return;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		if (dbinfo[i].made_subscription)
+		{
+			conn = connect_database(dbinfo[i].subconninfo, true);
+			if (conn != NULL)
+			{
+				drop_subscription(conn, &dbinfo[i]);
+				disconnect_database(conn);
+			}
+		}
+
+		if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+		{
+			conn = connect_database(dbinfo[i].pubconninfo, true);
+			if (conn != NULL)
+			{
+				if (dbinfo[i].made_publication)
+					drop_publication(conn, &dbinfo[i]);
+				if (dbinfo[i].made_replslot)
+					drop_replication_slot(conn, &dbinfo[i], NULL);
+				disconnect_database(conn);
+			}
+		}
+	}
+
+	if (made_transient_replslot)
+	{
+		conn = connect_database(dbinfo[0].pubconninfo, true);
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		disconnect_database(conn);
+	}
+}
+
+static void
+usage(void)
+{
+	printf(_("%s creates a new logical replica from a standby server.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
+	printf(_(" -P, --publisher-conninfo=CONNINFO   publisher connection string\n"));
+	printf(_(" -S, --subscriber-conninfo=CONNINFO  subscriber connection string\n"));
+	printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
+	printf(_(" -v, --verbose                       output verbose messages\n"));
+	printf(_(" -V, --version                       output version information, then exit\n"));
+	printf(_(" -?, --help                          show this help, then exit\n"));
+	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name plus a fallback application name.
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection string.
+ * If the second argument (dbname) is not null, returns dbname if the provided
+ * connection string contains it. If option --database is not provided, uses
+ * dbname as the only database to setup the logical replica.
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(char *conninfo, char *dbname, const char *noderole)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	PQconninfoOption *conn_opts = NULL;
+	PQconninfoOption *conn_opt;
+	char	   *errmsg = NULL;
+	char	   *ret;
+	int			i;
+
+	if (verbose)
+		pg_log_info("validating connection string on %s", noderole);
+
+	conn_opts = PQconninfoParse(conninfo, &errmsg);
+	if (conn_opts == NULL)
+	{
+		pg_log_error("could not parse connection string: %s", errmsg);
+		return NULL;
+	}
+
+	i = 0;
+	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+	{
+		if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+		{
+			if (dbname)
+				dbname = pg_strdup(conn_opt->val);
+			continue;
+		}
+
+		if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+		{
+			if (i > 0)
+				appendPQExpBufferChar(buf, ' ');
+			appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+			i++;
+		}
+	}
+
+	if (i > 0)
+		appendPQExpBufferChar(buf, ' ');
+	appendPQExpBuffer(buf, "fallback_application_name=%s", progname);
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+	PQconninfoFree(conn_opts);
+
+	return ret;
+}
+
+/*
+ * Get the absolute path from other PostgreSQL binaries (pg_ctl and
+ * pg_resetwal) that is used by it.
+ */
+static bool
+get_exec_path(const char *path)
+{
+	int			rc;
+
+	pg_ctl_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_ctl",
+						 "pg_ctl (PostgreSQL) " PG_VERSION "\n",
+						 pg_ctl_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_ctl", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_ctl", full_path, progname);
+		return false;
+	}
+
+	if (verbose)
+		pg_log_info("pg_ctl path is: %s", pg_ctl_path);
+
+	pg_resetwal_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_resetwal",
+						 "pg_resetwal (PostgreSQL) " PG_VERSION "\n",
+						 pg_resetwal_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_resetwal", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_resetwal", full_path, progname);
+		return false;
+	}
+
+	if (verbose)
+		pg_log_info("pg_resetwal path is: %s", pg_resetwal_path);
+
+	return true;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static bool
+check_data_directory(const char *datadir)
+{
+	struct stat statbuf;
+	char		versionfile[MAXPGPATH];
+
+	if (verbose)
+		pg_log_info("checking if directory \"%s\" is a cluster data directory",
+					datadir);
+
+	if (stat(datadir, &statbuf) != 0)
+	{
+		if (errno == ENOENT)
+			pg_log_error("data directory \"%s\" does not exist", datadir);
+		else
+			pg_log_error("could not access directory \"%s\": %s", datadir, strerror(errno));
+
+		return false;
+	}
+
+	snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+	if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+	{
+		pg_log_error("directory \"%s\" is not a database cluster directory", datadir);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	Assert(conninfo != NULL);
+
+	appendPQExpBufferStr(buf, conninfo);
+	appendPQExpBuffer(buf, " dbname=%s", dbname);
+	appendPQExpBufferStr(buf, " replication=database");
+
+	ret = pg_strdup(buf->data);
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+/*
+ * Store publication and subscription information.
+ */
+static LogicalRepInfo *
+store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo)
+{
+	LogicalRepInfo *dbinfo;
+	SimpleStringListCell *cell;
+	int			i = 0;
+
+	dbinfo = (LogicalRepInfo *) pg_malloc(num_dbs * sizeof(LogicalRepInfo));
+
+	for (cell = database_names.head; cell; cell = cell->next)
+	{
+		char	   *conninfo;
+
+		/* Publisher. */
+		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+		dbinfo[i].pubconninfo = conninfo;
+		dbinfo[i].dbname = cell->val;
+		dbinfo[i].made_replslot = false;
+		dbinfo[i].made_publication = false;
+		dbinfo[i].made_subscription = false;
+		/* other struct fields will be filled later. */
+
+		/* Subscriber. */
+		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+		dbinfo[i].subconninfo = conninfo;
+
+		i++;
+	}
+
+	return dbinfo;
+}
+
+static PGconn *
+connect_database(const char *conninfo, bool secure_search_path)
+{
+	PGconn	   *conn;
+
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_log_error("connection to database failed: %s", PQerrorMessage(conn));
+		return NULL;
+	}
+
+	/* secure search_path */
+	if (secure_search_path)
+	{
+		PGresult   *res;
+
+		res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not clear search_path: %s", PQresultErrorMessage(res));
+			return NULL;
+		}
+		PQclear(res);
+	}
+
+	return conn;
+}
+
+static void
+disconnect_database(PGconn *conn)
+{
+	Assert(conn != NULL);
+
+	PQfinish(conn);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static uint64
+get_sysid_from_conn(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	char	   *repconninfo;
+	uint64		sysid;
+
+	if (verbose)
+		pg_log_info("getting system identifier from publisher");
+
+	repconninfo = psprintf("%s replication=database", conninfo);
+	conn = connect_database(repconninfo, false);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "IDENTIFY_SYSTEM", PQresultErrorMessage(res));
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+
+	sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
+
+	disconnect_database(conn);
+
+	return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a replication connection.
+ */
+static uint64
+get_control_from_datadir(const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	uint64		sysid;
+
+	if (verbose)
+		pg_log_info("getting system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	sysid = cf->system_identifier;
+
+	pfree(cf);
+
+	return sysid;
+}
+
+/*
+ * Modify the system identifier. Since a standby server preserves the system
+ * identifier, it makes sense to change it to avoid situations in which WAL
+ * files from one of the systems might be used in the other one.
+ */
+static void
+modify_sysid(const char *pg_resetwal_path, const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	struct timeval tv;
+
+	char	   *cmd_str;
+	int			rc;
+
+	if (verbose)
+		pg_log_info("modifying system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	/*
+	 * Select a new system identifier.
+	 *
+	 * XXX this code was extracted from BootStrapXLOG().
+	 */
+	gettimeofday(&tv, NULL);
+	cf->system_identifier = ((uint64) tv.tv_sec) << 32;
+	cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
+	cf->system_identifier |= getpid() & 0xFFF;
+
+	update_controlfile(datadir, cf, true);
+
+	if (verbose)
+		pg_log_info("running pg_resetwal in the subscriber");
+
+	cmd_str = psprintf("\"%s\" -D \"%s\"", pg_resetwal_path, datadir);
+	rc = system(cmd_str);
+	if (rc == 0)
+		pg_log_info("subscriber successfully changed the system identifier");
+	else
+		pg_log_error("subscriber failed to change system identifier: exit code: %d", rc);
+
+	pfree(cf);
+}
+
+static bool
+create_all_logical_replication_slots(LogicalRepInfo *dbinfo)
+{
+	int			i;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		char		replslotname[NAMEDATALEN];
+
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		res = PQexec(conn,
+					 "SELECT oid FROM pg_catalog.pg_database WHERE datname = current_database()");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain database OID: %s", PQresultErrorMessage(res));
+			return false;
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+						 PQntuples(res), 1);
+			return false;
+		}
+
+		/* Remember database OID. */
+		dbinfo[i].oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+		PQclear(res);
+
+		/*
+		 * Build the replication slot name. The name must not exceed
+		 * NAMEDATALEN - 1. This current schema uses a maximum of 36
+		 * characters (14 + 10 + 1 + 10 + '\0'). System identifier is included
+		 * to reduce the probability of collision. By default, subscription
+		 * name is used as replication slot name.
+		 */
+		snprintf(replslotname, sizeof(replslotname),
+				 "pg_subscriber_%u_%d",
+				 dbinfo[i].oid,
+				 (int) getpid());
+		dbinfo[i].subname = pg_strdup(replslotname);
+
+		/* Create replication slot on publisher. */
+		if (create_logical_replication_slot(conn, &dbinfo[i], replslotname) != NULL)
+			pg_log_info("create replication slot \"%s\" on publisher", replslotname);
+		else
+			return false;
+
+		disconnect_database(conn);
+	}
+
+	return true;
+}
+
+/*
+ * Create a logical replication slot and returns a consistent LSN. The returned
+ * LSN might be used to catch up the subscriber up to the required point.
+ *
+ * CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the consistent LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+								char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	char	   *lsn = NULL;
+	bool		transient_replslot = false;
+
+	Assert(conn != NULL);
+
+	/*
+	 * If no slot name is informed, it is a transient replication slot used
+	 * only for catch up purposes.
+	 */
+	if (slot_name[0] == '\0')
+	{
+		snprintf(slot_name, sizeof(slot_name), "pg_subscriber_%d_startpoint",
+			 (int) getpid());
+		transient_replslot = true;
+	}
+
+	if (verbose)
+		pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+	appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+					 PQresultErrorMessage(res));
+		return lsn;
+	}
+
+	/* for cleanup purposes */
+	if (transient_replslot)
+		made_transient_replslot = true;
+	else
+		dbinfo->made_replslot = true;
+
+	lsn = pg_strdup(PQgetvalue(res, 0, 1));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+
+	return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+					 PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc, int action)
+{
+	if (rc != 0)
+	{
+		if (WIFEXITED(rc))
+		{
+			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+		}
+		else if (WIFSIGNALED(rc))
+		{
+#if defined(WIN32)
+			pg_log_error("pg_ctl was terminated by exception 0x%X", WTERMSIG(rc));
+			pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+#else
+			pg_log_error("pg_ctl was terminated by signal %d: %s",
+						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+		}
+		else
+		{
+			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+		}
+
+		pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+		exit(1);
+	}
+
+	if (verbose)
+	{
+		if (action)
+			pg_log_info("postmaster was started");
+		else
+			pg_log_info("postmaster was stopped");
+	}
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ */
+static void
+wait_for_end_recovery(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	int			status = POSTMASTER_STILL_STARTING;
+
+	if (verbose)
+		pg_log_info("waiting the postmaster to reach the consistent state");
+
+	conn = connect_database(conninfo, true);
+	if (conn == NULL)
+		exit(1);
+
+	for (;;)
+	{
+		bool		in_recovery;
+
+		res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain recovery progress");
+			exit(1);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("unexpected result from pg_is_in_recovery function");
+			exit(1);
+		}
+
+		in_recovery = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+
+		PQclear(res);
+
+		/* Does the recovery process finish? */
+		if (!in_recovery)
+		{
+			status = POSTMASTER_READY;
+			break;
+		}
+
+		/* Keep waiting. */
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+	}
+
+	disconnect_database(conn);
+
+	if (status == POSTMASTER_STILL_STARTING)
+	{
+		pg_log_error("server did not end recovery");
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("postmaster reached the consistent state");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/* Check if the publication needs to be created. */
+	appendPQExpBuffer(str,
+					  "SELECT puballtables FROM pg_catalog.pg_publication WHERE pubname = '%s'",
+					  dbinfo->pubname);
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) == 1)
+	{
+		/*
+		 * If publication name already exists and puballtables is true, let's
+		 * use it. A previous run of pg_subscriber must have created this
+		 * publication. Bail out.
+		 */
+		if (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
+		{
+			if (verbose)
+				pg_log_info("publication \"%s\" already exists", dbinfo->pubname);
+			return;
+		}
+		else
+		{
+			/*
+			 * Unfortunately, if it reaches this code path, it will always fail
+			 * (unless you decide to change the existing publication name).
+			 * That's bad but it is very unlikely that the user will choose a
+			 * name with pg_subscriber_ prefix followed by the exact database
+			 * oid in which puballtables is false.
+			 */
+			pg_log_error("publication \"%s\" does not replicate changes for all tables",
+						 dbinfo->pubname);
+			pg_log_error_hint("Consider renaming this publication.");
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	PQclear(res);
+	resetPQExpBuffer(str);
+
+	if (verbose)
+		pg_log_info("creating publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+					 dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_publication = true;
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it. By
+ * default, the subscription name is used as replication slot name. It is
+ * not required to copy data. The subscription will be created but it will not
+ * be enabled now. That's because the replication progress must be set and the
+ * replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("creating subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s "
+					  "WITH (create_slot = false, copy_data = false, enabled = false)",
+					  dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+					 dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_subscription = true;
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove subscription if it couldn't finish all steps.
+ */
+static void
+drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", dbinfo->subname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the temporary
+ * replication slot. The goal is to set up the initial location for the logical
+ * replication that is the exact LSN that the subscriber was promoted. Once the
+ * subscription is enabled it will start streaming from that location onwards.
+ */
+static void
+set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	Oid			suboid;
+	char		originname[NAMEDATALEN];
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(str,
+					  "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", dbinfo->subname);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscription OID: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) != 1)
+	{
+		pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/*
+	 * The origin name is defined as pg_%u. %u is the subscription OID. See
+	 * ApplyWorkerMain().
+	 */
+	suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+	snprintf(originname, sizeof(originname), "pg_%u", suboid);
+
+	PQclear(res);
+
+	if (verbose)
+		pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+					originname, lsn, dbinfo->dbname);
+
+	resetPQExpBuffer(str);
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", originname, lsn);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+					 dbinfo->subname, PQresultErrorMessage(res));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial location, enabling the subscription is the last step
+ * of this setup.
+ */
+static void
+enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("enabling subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not enable subscription \"%s\": %s", dbinfo->subname,
+					 PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] =
+	{
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"pgdata", required_argument, NULL, 'D'},
+		{"publisher-conninfo", required_argument, NULL, 'P'},
+		{"subscriber-conninfo", required_argument, NULL, 'S'},
+		{"database", required_argument, NULL, 'd'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+
+	int			c;
+	int			option_index;
+	int			rc;
+
+	char	   *pg_ctl_cmd;
+
+	char	   *pub_base_conninfo = NULL;
+	char	   *sub_base_conninfo = NULL;
+	char	   *dbname_conninfo = NULL;
+
+	uint64		pub_sysid;
+	uint64		sub_sysid;
+	struct stat statbuf;
+
+	PGconn	   *conn;
+	char	   *consistent_lsn;
+
+	PQExpBuffer recoveryconfcontents = NULL;
+
+	char		pidfile[MAXPGPATH];
+
+	int			i;
+
+	pg_logging_init(argv[0]);
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_subscriber"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_subscriber (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	atexit(cleanup_objects_atexit);
+
+	/*
+	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
+	 * it either.
+	 */
+#ifndef WIN32
+	if (geteuid() == 0)
+	{
+		pg_log_error("cannot be executed by \"root\"");
+		pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
+						  progname);
+		exit(1);
+	}
+#endif
+
+	while ((c = getopt_long(argc, argv, "D:P:S:d:t:v",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				subscriber_dir = pg_strdup(optarg);
+				break;
+			case 'P':
+				pub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'S':
+				sub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'd':
+				simple_string_list_append(&database_names, optarg);
+				num_dbs++;
+				break;
+			case 'v':
+				verbose++;
+				break;
+			default:
+				/* getopt_long already emitted a complaint */
+				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (subscriber_dir == NULL)
+	{
+		pg_log_error("no subscriber data directory specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Parse connection string. Build a base connection string that might be
+	 * reused by multiple databases.
+	 */
+	if (pub_conninfo_str == NULL)
+	{
+		/*
+		 * TODO use primary_conninfo (if available) from subscriber and
+		 * extract publisher connection string. Assume that there are
+		 * identical entries for physical and logical replication. If there is
+		 * not, we would fail anyway.
+		 */
+		pg_log_error("no publisher connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	pub_base_conninfo = get_base_conninfo(pub_conninfo_str, dbname_conninfo,
+										  "publisher");
+	if (pub_base_conninfo == NULL)
+		exit(1);
+
+	if (sub_conninfo_str == NULL)
+	{
+		pg_log_error("no subscriber connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	sub_base_conninfo = get_base_conninfo(sub_conninfo_str, NULL, "subscriber");
+	if (sub_base_conninfo == NULL)
+		exit(1);
+
+	if (database_names.head == NULL)
+	{
+		if (verbose)
+			pg_log_info("no database was specified");
+
+		/*
+		 * If --database option is not provided, try to obtain the dbname from
+		 * the publisher conninfo. If dbname parameter is not available, error
+		 * out.
+		 */
+		if (dbname_conninfo)
+		{
+			simple_string_list_append(&database_names, dbname_conninfo);
+			num_dbs++;
+
+			if (verbose)
+				pg_log_info("database \"%s\" was extracted from the publisher connection string",
+							dbname_conninfo);
+		}
+		else
+		{
+			pg_log_error("no database name specified");
+			pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+			exit(1);
+		}
+	}
+
+	/*
+	 * Get the absolute path of pg_ctl and pg_resetwal on the subscriber.
+	 */
+	if (!get_exec_path(argv[0]))
+		exit(1);
+
+	/* rudimentary check for a data directory. */
+	if (!check_data_directory(subscriber_dir))
+		exit(1);
+
+	/* Store database information for publisher and subscriber. */
+	dbinfo = store_pub_sub_info(pub_base_conninfo, sub_base_conninfo);
+
+	/*
+	 * Check if the subscriber data directory has the same system identifier
+	 * than the publisher data directory.
+	 */
+	pub_sysid = get_sysid_from_conn(dbinfo[0].pubconninfo);
+	sub_sysid = get_control_from_datadir(subscriber_dir);
+	if (pub_sysid != sub_sysid)
+	{
+		pg_log_error("subscriber data directory is not a copy of the source database cluster");
+		exit(1);
+	}
+
+	/* subscriber PID file. */
+	snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+	/*
+	 * Stop the subscriber if it is a standby server. Before executing the
+	 * transformation steps, make sure the subscriber is not running because
+	 * one of the steps is to modify some recovery parameters that require a
+	 * restart.
+	 */
+	if (stat(pidfile, &statbuf) == 0)
+	{
+		if (verbose)
+		{
+			pg_log_info("subscriber is up and running");
+			pg_log_info("stopping the server to start the transformation steps");
+		}
+
+		pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+		rc = system(pg_ctl_cmd);
+		pg_ctl_status(pg_ctl_cmd, rc, 0);
+	}
+
+	/*
+	 * Create a replication slot for each database on the publisher.
+	 */
+	if (!create_all_logical_replication_slots(dbinfo))
+		exit(1);
+
+	/*
+	 * Create a logical replication slot to get a consistent LSN.
+	 *
+	 * This consistent LSN will be used later to advanced the recently created
+	 * replication slots. We cannot use the last created replication slot
+	 * because the consistent LSN should be obtained *after* the base backup
+	 * finishes (and the base backup should include the logical replication
+	 * slots).
+	 *
+	 * XXX we should probably use the last created replication slot to get a
+	 * consistent LSN but it should be changed after adding pg_basebackup
+	 * support.
+	 *
+	 * A temporary replication slot is not used here to avoid keeping a
+	 * replication connection open (depending when base backup was taken, the
+	 * connection should be open for a few hours).
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo, false);
+	if (conn == NULL)
+		exit(1);
+	consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0],
+													 temp_replslot);
+
+	/*
+	 * Write recovery parameters.
+	 *
+	 * Despite of the recovery parameters will be written to the subscriber,
+	 * use a publisher connection for the follwing recovery functions. The
+	 * connection is only used to check the current server version (physical
+	 * replica, same server version). The subscriber is not running yet.
+	 */
+	recoveryconfcontents = GenerateRecoveryConfig(conn, NULL);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+					  consistent_lsn);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
+
+	WriteRecoveryConfig(conn, subscriber_dir, recoveryconfcontents);
+	disconnect_database(conn);
+
+	/*
+	 * Start subscriber and wait until accepting connections.
+	 */
+	if (verbose)
+		pg_log_info("starting the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 1);
+
+	/*
+	 * Waiting the subscriber to be promoted.
+	 */
+	wait_for_end_recovery(dbinfo[0].subconninfo);
+
+	/*
+	 * Create a publication for each database. This step should be executed
+	 * after promoting the subscriber to avoid replicating unnecessary
+	 * objects.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		char		pubname[NAMEDATALEN];
+
+		/* Connect to publisher. */
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		/*
+		 * Build the publication name. The name must not exceed NAMEDATALEN -
+		 * 1. This current schema uses a maximum of 35 characters (14 + 10 +
+		 * '\0').
+		 */
+		snprintf(pubname, sizeof(pubname), "pg_subscriber_%u", dbinfo[i].oid);
+		dbinfo[i].pubname = pg_strdup(pubname);
+
+		create_publication(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Create a subscription for each database.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		/* Connect to subscriber. */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		create_subscription(conn, &dbinfo[i]);
+
+		/* Set the replication progress to the correct LSN. */
+		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+		/* Enable subscription. */
+		enable_subscription(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * The transient replication slot is no longer required. Drop it.
+	 *
+	 * XXX we might not fail here. Instead, we provide a warning so the user
+	 * eventually drops the replication slot later.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo, true);
+	if (conn == NULL)
+	{
+		pg_log_warning("could not drop transient replication slot \"%s\" on publisher", temp_replslot);
+		pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+	}
+	else
+	{
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Stop the subscriber.
+	 */
+	if (verbose)
+		pg_log_info("stopping the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 0);
+
+	/*
+	 * Change system identifier.
+	 */
+	modify_sysid(pg_resetwal_path, subscriber_dir);
+
+	success = true;
+
+	if (verbose)
+		pg_log_info("Done!");
+
+	return 0;
+}
diff --git a/src/bin/pg_subscriber/po/meson.build b/src/bin/pg_subscriber/po/meson.build
new file mode 100644
index 0000000000..f287b5e974
--- /dev/null
+++ b/src/bin/pg_subscriber/po/meson.build
@@ -0,0 +1,3 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+nls_targets += [i18n.gettext('pg_subscriber-' + pg_version_major.to_string())]
diff --git a/src/bin/pg_subscriber/t/001_basic.pl b/src/bin/pg_subscriber/t/001_basic.pl
new file mode 100644
index 0000000000..505a060101
--- /dev/null
+++ b/src/bin/pg_subscriber/t/001_basic.pl
@@ -0,0 +1,42 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+#
+# Test checking options of pg_subscriber.
+#
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_subscriber');
+program_version_ok('pg_subscriber');
+program_options_handling_ok('pg_subscriber');
+
+my $datadir = PostgreSQL::Test::Utils::tempdir;
+
+command_fails(['pg_subscriber'],
+	'no subscriber data directory specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--pgdata', $datadir
+	],
+	'no publisher connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres'
+	],
+	'no subscriber connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres',
+		'--subscriber-conninfo', 'dbname=postgres'
+	],
+	'no database name specified');
+
+done_testing();
diff --git a/src/bin/pg_subscriber/t/002_standby.pl b/src/bin/pg_subscriber/t/002_standby.pl
new file mode 100644
index 0000000000..40bc3bf13c
--- /dev/null
+++ b/src/bin/pg_subscriber/t/002_standby.pl
@@ -0,0 +1,114 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+#
+# Test using a standby server as the subscriber.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_p;
+my $node_f;
+my $node_s;
+my $result;
+
+# Set up node P as primary
+$node_p = PostgreSQL::Test::Cluster->new('node_p');
+$node_p->init(allows_streaming => 'logical');
+$node_p->start;
+
+# Set up node F as about-to-fail node
+$node_f = PostgreSQL::Test::Cluster->new('node_f');
+$node_f->init(allows_streaming => 'logical');
+$node_f->start;
+
+# Create databases
+# Create a test table and insert a row in primary server
+$node_p->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_p->safe_psql('pg1', "CREATE TABLE tbl1 (a text)");
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_p->safe_psql('pg2', "CREATE TABLE tbl2 (a text)");
+
+# Set up node S as standby linking to node P
+$node_p->backup('backup_1');
+$node_s = PostgreSQL::Test::Cluster->new('node_s');
+$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s->set_standby_mode();
+$node_s->start;
+
+# Insert another row on P and wait standby S to catch up
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')");
+$node_p->wait_for_replay_catchup($node_s);
+
+# Run pg_subscriber on about-to-fail node (F)
+command_fails(
+	[
+		'pg_subscriber', "--verbose",
+		"--pgdata", $node_f->data_dir,
+		"--publisher-conninfo", $node_p->connstr('pg1'),
+		"--subscriber-conninfo", $node_f->connstr('pg1'),
+		"--database", 'pg1',
+		"--database", 'pg2'
+	],
+	'subscriber data directory is not a copy of the source database cluster');
+
+# Run pg_subscriber on node S
+command_ok(
+	[
+		'pg_subscriber', "--verbose",
+		"--pgdata", $node_s->data_dir,
+		"--publisher-conninfo", $node_p->connstr('pg1'),
+		"--subscriber-conninfo", $node_s->connstr('pg1'),
+		"--database", 'pg1',
+		"--database", 'pg2'
+	],
+	'run pg_subscriber on node S');
+
+# Insert rows on P
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')");
+$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')");
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+
+# Get subscription names
+$result = $node_s->safe_psql(
+	'postgres', qq(
+	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_subscriber_'
+));
+my @subnames = split("\n", $result);
+
+# Wait subscriber to catch up
+$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+
+# Check result on database pg1
+$result = $node_s->safe_psql('pg1', "SELECT * FROM tbl1");
+is( $result, qq(first row
+second row
+third row),
+	'logical replication works on database pg1');
+
+# Check result on database pg2
+$result = $node_s->safe_psql('pg2', "SELECT * FROM tbl2");
+is( $result, qq(row 1),
+	'logical replication works on database pg2');
+
+# Different system identifier?
+my $sysid_p = $node_p->safe_psql('postgres', "SELECT system_identifier FROM pg_control_system()");
+my $sysid_s = $node_s->safe_psql('postgres', "SELECT system_identifier FROM pg_control_system()");
+ok($sysid_p != $sysid_s, 'system identifier was changed');
+
+# clean up
+$node_p->teardown_node;
+$node_s->teardown_node;
+
+done_testing();
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index db242c9205..b35a5ec693 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -55,7 +55,7 @@ my @contrib_excludes = (
 # Set of variables for frontend modules
 my $frontend_defines = { 'pgbench' => 'FD_SETSIZE=1024' };
 my @frontend_uselibpq =
-  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb');
+  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb', 'pg_subscriber');
 my @frontend_uselibpgport = (
 	'pg_amcheck', 'pg_archivecleanup',
 	'pg_test_fsync', 'pg_test_timing',
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 06b25617bc..2d5c08d06a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1275,9 +1275,9 @@ JsonManifestWALRangeField
 JsonObjectAgg
 JsonObjectConstructor
 JsonOutput
-JsonParseExpr
 JsonParseContext
 JsonParseErrorType
+JsonParseExpr
 JsonPath
 JsonPathBool
 JsonPathExecContext
@@ -1340,6 +1340,7 @@ LINE
 LLVMAttributeRef
 LLVMBasicBlockRef
 LLVMBuilderRef
+LLVMContextRef
 LLVMErrorRef
 LLVMIntPredicate
 LLVMJITEventListenerRef
@@ -1913,7 +1914,6 @@ ParallelHashJoinBatch
 ParallelHashJoinBatchAccessor
 ParallelHashJoinState
 ParallelIndexScanDesc
-ParallelReadyList
 ParallelSlot
 ParallelSlotArray
 ParallelSlotResultHandler
@@ -2993,7 +2993,6 @@ WaitEvent
 WaitEventActivity
 WaitEventBufferPin
 WaitEventClient
-WaitEventExtension
 WaitEventExtensionCounterData
 WaitEventExtensionEntryById
 WaitEventExtensionEntryByName
@@ -3403,6 +3402,7 @@ indexed_tlist
 inet
 inetKEY
 inet_struct
+initRowMethod
 init_function
 inline_cte_walker_context
 inline_error_callback_arg
@@ -3870,7 +3870,6 @@ wchar2mb_with_len_converter
 wchar_t
 win32_deadchild_waitinfo
 wint_t
-worker_spi_state
 worker_state
 worktable
 wrap
-- 
2.30.2

Reply via email to