Hello.

We've noticed that when walreceiver is waiting for a connection to
complete, standby does not immediately respond to promotion
requests. In PG14, upon receiving a promotion request, walreceiver
terminates instantly, but in PG16, it waits for connection
timeout. This behavior is attributed to commit 728f86fec65, where a
part of libpqrcv_connect was simply replaced with a call to
libpqsrc_connect_params. This behavior can be verified by simply
dropping packets from the standby to the primary.

By a simple thought, in walreceiver, libpqsrv_connect_internal could
just call ProcessWalRcvInterrupts() instead of CHECK_FOR_INTERRUPTS(),
but this approach is quite ugly. Since ProcessWalRcvInterrupts()
originally calls CHECK_FOR_INTERRUPTS() and there are no standalone
calls to CHECK_FOR_INTERRUPTS() within walreceiver, I think it might
be better to use ProcDiePending instead of ShutdownRequestPending.  I
added a subset function of die() as the SIGTERM handler in walsender
in a crude patch attached.

What do you think about the issue, and the approach?

If there are no issues or objections with this method, I will continue
to refine this patch. For now, I plan to register it for the upcoming
commitfest.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 693b3669ba..e503799bd8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -738,7 +738,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1042,7 +1042,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 26ded928a7..c53a8e6c89 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return WalRcv != NULL;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7298a187d1..04cab7dafa 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 949e874f21..c69c8daa6a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -456,8 +456,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */

Reply via email to