On Wed, Mar 27, 2024 at 11:39 AM shveta malik <shveta.ma...@gmail.com> wrote:
>
> Thanks for the patch. Few trivial things:

Thanks for reviewing.

> ----------
> 1)
> system-views.sgml:
>
> a) "Note that the slots" --> "Note that the slots on the standbys,"
> --it is good to mention "standbys" as synced could be true on primary
> as well (promoted standby)

Done.

> b) If you plan to add more info which Bertrand suggested, then it will
> be better to make a <note> section instead of using "Note"

I added the note that Bertrand specified upthread. But, I couldn't
find an instance of adding <note> ... </note> within a table. Hence
with "Note that ...." statments just like any other notes in the
system-views.sgml. pg_replication_slot in system-vews.sgml renders as
table, so having <note> ... </note> may not be a great idea.

> 2)
> commit msg:
>
> "The impact of this
> on a promoted standby inactive_since is always NULL for all
> synced slots even after server restart.
> "
> Sentence looks broken.
> ---------

Reworded.

> Apart from the above trivial things, v26-001 looks good to me.

Please check the attached v27 patch which also has Bertrand's comment
on deduplicating the TAP function. I've now moved it to Cluster.pm.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From b4f113ef1d3467383913d0f04cc672372133420d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 27 Mar 2024 09:15:41 +0000
Subject: [PATCH v27] Maintain inactive_since for synced slots correctly.

The slot's inactive_since isn't currently maintained for
synced slots on the standby. The commit a11f330b55 prevents
updating inactive_since with RecoveryInProgress() check in
RestoreSlotFromDisk(). But, the issue is that
RecoveryInProgress() always returns true in
RestoreSlotFromDisk() as 'xlogctl->SharedRecoveryState' is
always 'RECOVERY_STATE_CRASH' at that time. Because of this,
inactive_since is always NULL on a promoted standby for all
synced slots even after server restart.

Above issue led us to a question as to why we can't just update
inactive_since for synced slots on the standby with the value
received from remote slot on the primary. This is consistent with
any other slot parameter i.e. all of them are synced from the
primary.

This commit does two things:
1) Updates inactive_since for sync slots with the value
received from the primary's slot.

2) Ensures the value is set to current timestamp during the
shutdown of slot sync machinery to help correctly interpret the
time if the standby gets promoted without a restart.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACWLctoiH-pSjWnEpR54q4DED6rw_BRJm5pCx86_Y01MoQ%40mail.gmail.com
---
 doc/src/sgml/system-views.sgml                |  8 +++
 src/backend/replication/logical/slotsync.c    | 61 ++++++++++++++++++-
 src/backend/replication/slot.c                | 41 +++++++++----
 src/test/perl/PostgreSQL/Test/Cluster.pm      | 34 +++++++++++
 src/test/recovery/t/019_replslot_limit.pl     | 26 +-------
 .../t/040_standby_failover_slots_sync.pl      | 43 +++++++++++++
 6 files changed, 174 insertions(+), 39 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3c8dca8ca3..07f6d177e8 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2530,6 +2530,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       <para>
         The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
+        Note that the slots on the standbys that are being synced from a
+        primary server (whose <structfield>synced</structfield> field is
+        <literal>true</literal>), will get the
+        <structfield>inactive_since</structfield> value from the
+        corresponding remote slot on the primary. Also, note that for the
+        synced slots on the standby, after the standby starts up (i.e. after
+        the slots are loaded from the disk), the inactive_since will remain
+        zero until the next slot sync cycle.
       </para></entry>
      </row>
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..9c95a4b062 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -137,9 +137,12 @@ typedef struct RemoteSlot
 
 	/* RS_INVAL_NONE if valid, or the reason of invalidation */
 	ReplicationSlotInvalidationCause invalidated;
+
+	TimestampTz inactive_since; /* in seconds */
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -167,6 +170,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+		remote_slot->inactive_since == slot->inactive_since &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
 		return false;
 
@@ -182,6 +186,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->inactive_since = remote_slot->inactive_since;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -652,9 +657,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, TIMESTAMPTZOID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -663,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_since"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -743,6 +748,13 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		/*
+		 * It is possible to get null value for inactive_since if the slot is
+		 * active on the primary server, so handle accordingly.
+		 */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->inactive_since = isnull ? 0 : DatumGetTimestampTz(d);
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
@@ -1296,6 +1308,46 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	Assert(false);
 }
 
+/*
+ * Update the inactive_since property for synced slots.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+	TimestampTz now = 0;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Check if it is a synchronized slot */
+		if (s->in_use && s->data.synced)
+		{
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * We get the current time beforehand and only once to avoid
+			 * system calls overhead while holding the lock.
+			 */
+			if (now == 0)
+				now = GetCurrentTimestamp();
+
+			/*
+			 * Set the time since the slot has become inactive after shutting
+			 * down slot sync machinery. This helps correctly interpret the
+			 * time if the standby gets promoted without a restart.
+			 */
+			SpinLockAcquire(&s->mutex);
+			s->inactive_since = now;
+			SpinLockRelease(&s->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1309,6 +1361,7 @@ ShutDownSlotSync(void)
 	if (SlotSyncCtx->pid == InvalidPid)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
+		update_synced_slots_inactive_since();
 		return;
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1341,6 +1394,8 @@ ShutDownSlotSync(void)
 	}
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	update_synced_slots_inactive_since();
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d778c0b921..5d6882e4db 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -42,6 +42,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -655,6 +656,7 @@ ReplicationSlotRelease(void)
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
 	TimestampTz now = 0;
+	bool		update_inactive_since;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -690,13 +692,19 @@ ReplicationSlotRelease(void)
 	}
 
 	/*
-	 * Set the last inactive time after marking the slot inactive. We don't
-	 * set it for the slots currently being synced from the primary to the
-	 * standby because such slots are typically inactive as decoding is not
-	 * allowed on those.
+	 * Set the time since the slot has become inactive.
+	 *
+	 * Note that we don't set it for the slots currently being synced from the
+	 * primary to the standby, because such slots typically sync the data from
+	 * the remote slot.
 	 */
 	if (!(RecoveryInProgress() && slot->data.synced))
+	{
 		now = GetCurrentTimestamp();
+		update_inactive_since = true;
+	}
+	else
+		update_inactive_since = false;
 
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
@@ -706,11 +714,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		if (update_inactive_since)
+			slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
-	else
+	else if (update_inactive_since)
 	{
 		SpinLockAcquire(&slot->mutex);
 		slot->inactive_since = now;
@@ -2369,13 +2378,21 @@ RestoreSlotFromDisk(const char *name)
 		slot->active_pid = 0;
 
 		/*
-		 * We set the last inactive time after loading the slot from the disk
-		 * into memory. Whoever acquires the slot i.e. makes the slot active
-		 * will reset it. We don't set it for the slots currently being synced
-		 * from the primary to the standby because such slots are typically
-		 * inactive as decoding is not allowed on those.
+		 * Set the time since the slot has become inactive after loading the
+		 * slot from the disk into memory. Whoever acquires the slot i.e.
+		 * makes the slot active will reset it.
+		 *
+		 * Note that we don't set it for the slots currently being synced from
+		 * the primary to the standby, because such slots typically sync the
+		 * data from the remote slot. We use InRecovery flag instead of
+		 * RecoveryInProgress() as the latter always returns true at this time
+		 * even on primary.
+		 *
+		 * Note that for synced slots after the standby starts up (i.e. after
+		 * the slots are loaded from the disk), the inactive_since will remain
+		 * zero until the next slot sync cycle.
 		 */
-		if (!(RecoveryInProgress() && slot->data.synced))
+		if (!(InRecovery && slot->data.synced))
 			slot->inactive_since = GetCurrentTimestamp();
 		else
 			slot->inactive_since = 0;
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index b08296605c..221fe93a8b 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3276,6 +3276,40 @@ sub create_logical_slot_on_standby
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Get inactive_since column value for a given replication slot validating it
+against given reference time.
+
+=cut
+
+sub get_slot_inactive_since_value
+{
+	my ($self, $slot_name, $reference_time) = @_;
+	my $name = $self->name;
+
+	my $inactive_since = $self->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	if (defined $reference_time)
+	{
+		is($self->safe_psql('postgres',
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+					'$inactive_since'::timestamptz >= '$reference_time'::timestamptz;]
+			),
+			't',
+			"last inactive time for slot $slot_name is valid on node $name")
+			or die "could not validate captured inactive_since for slot $slot_name";
+	}
+
+	return $inactive_since;
+}
+
+=pod
+
 =item $node->advance_wal(num)
 
 Advance WAL of node by given number of segments.
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 3b9a306a8b..c8e5e5054d 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -443,7 +443,7 @@ $primary4->safe_psql(
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the standby below.
 my $inactive_since =
-	capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
+	$primary4->get_slot_inactive_since_value($sb4_slot, $slot_creation_time);
 
 $standby4->start;
 
@@ -502,7 +502,7 @@ $publisher4->safe_psql('postgres',
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the subscriber below.
 $inactive_since =
-	capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
+	$publisher4->get_slot_inactive_since_value($lsub4_slot, $slot_creation_time);
 
 $subscriber4->start;
 $subscriber4->safe_psql('postgres',
@@ -540,26 +540,4 @@ is( $publisher4->safe_psql(
 $publisher4->stop;
 $subscriber4->stop;
 
-# Capture and validate inactive_since of a given slot.
-sub capture_and_validate_slot_inactive_since
-{
-	my ($node, $slot_name, $slot_creation_time) = @_;
-
-	my $inactive_since = $node->safe_psql('postgres',
-		qq(SELECT inactive_since FROM pg_replication_slots
-			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
-		);
-
-	# Check that the captured time is sane
-	is( $node->safe_psql(
-			'postgres',
-			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
-				'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
-		),
-		't',
-		"last inactive time for an active slot $slot_name is sane");
-
-	return $inactive_since;
-}
-
 done_testing();
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..33e3a8dcf0 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
 $subscriber1->start;
 
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Create a slot on the publisher with failover disabled
 $publisher->safe_psql('postgres',
 	"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,10 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
 	1);
 
+# Capture the inactive_since of the slot from the primary
+my $inactive_since_on_primary =
+	$primary->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -190,6 +201,18 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+	$standby1->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slots on the standby must get the inactive_since from the primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_primary'::timestamptz = '$inactive_since_on_standby'::timestamptz;"
+	),
+	"t",
+	'synchronized slot has got the inactive_since from the primary');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -750,8 +773,28 @@ $primary->reload;
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 $standby1->promote;
 
+# Capture the inactive_since of the synced slot after the promotion.
+# Expectation here is that the slot gets its own inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+	$standby1->get_slot_inactive_since_value('lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+	),
+	"t",
+	'synchronized slot has got its own inactive_since on the new primary');
+
 # Update subscription with the new primary's connection info
 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
 $subscriber1->safe_psql('postgres',
-- 
2.34.1

Reply via email to