From e577260dce26ac501c9ad00b899469a1cc028e1a Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 11 Aug 2016 17:11:32 -0400
Subject: [PATCH 2/4] Condition variable code.

Reviewed in an earlier version by Rahila Syed.
---
 src/backend/access/transam/xact.c             |   4 +
 src/backend/bootstrap/bootstrap.c             |   2 +
 src/backend/postmaster/bgwriter.c             |   2 +
 src/backend/postmaster/checkpointer.c         |   2 +
 src/backend/postmaster/walwriter.c            |   2 +
 src/backend/replication/walsender.c           |   2 +
 src/backend/storage/lmgr/Makefile             |   2 +-
 src/backend/storage/lmgr/condition_variable.c | 157 ++++++++++++++++++++++++++
 src/backend/storage/lmgr/proc.c               |   7 ++
 src/include/storage/condition_variable.h      |  58 ++++++++++
 src/include/storage/proc.h                    |   4 +
 src/include/storage/proclist.h                |  16 +++
 12 files changed, 257 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/storage/lmgr/condition_variable.c
 create mode 100644 src/include/storage/condition_variable.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f36ea..b40b2e0 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
 #include "replication/origin.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -2476,6 +2477,9 @@ AbortTransaction(void)
 	/* Reset WAL record construction state */
 	XLogResetInsertion();
 
+	/* Cancel condition variable sleep */
+	ConditionVariableCancelSleep();
+
 	/*
 	 * Also clean up any open wait for lock, since the lock manager will choke
 	 * if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index e518e17..9eeb49c 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
 #include "replication/walreceiver.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
@@ -535,6 +536,7 @@ static void
 ShutdownAuxiliaryProcess(int code, Datum arg)
 {
 	LWLockReleaseAll();
+	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 }
 
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 00f03d8..40f3f80 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
 #include "postmaster/bgwriter.h"
 #include "storage/bufmgr.h"
 #include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -189,6 +190,7 @@ BackgroundWriterMain(void)
 		 * about in bgwriter, but we do have LWLocks, buffers, and temp files.
 		 */
 		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
 		AbortBufferIO();
 		UnlockBuffers();
 		/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 8d4b353..0c072f3 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
 #include "postmaster/bgwriter.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -273,6 +274,7 @@ CheckpointerMain(void)
 		 * files.
 		 */
 		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
 		pgstat_report_wait_end();
 		AbortBufferIO();
 		UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 228190a..e5de019 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
 #include "pgstat.h"
 #include "postmaster/walwriter.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -169,6 +170,7 @@ WalWriterMain(void)
 		 * about in walwriter, but we do have LWLocks, and perhaps buffers?
 		 */
 		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
 		pgstat_report_wait_end();
 		AbortBufferIO();
 		UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a0dba19..44143d7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
 WalSndErrorCleanup(void)
 {
 	LWLockReleaseAll();
+	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 
 	if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73..e1b787e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
-	s_lock.o predicate.o
+	s_lock.o predicate.o condition_variable.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 0000000..0639689
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,157 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ *	  Implementation of condition variables.  Condition variables provide
+ *	  a way for one process to wait until a specific condition occurs,
+ *	  without needing to know the specific identity of the process for
+ *	  which they are waiting.  Waits for condition variables can be
+ *	  interrupted, unlike LWLock waits.  Condition variables are safe
+ *	  to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+	SpinLockInit(&cv->mutex);
+	proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+	int		pgprocno = MyProc->pgprocno;
+
+	/*
+	 * It's not legal to prepare a sleep until the previous sleep has been
+	 * completed or cancelled.
+	 */
+	Assert(cv_sleep_target == NULL);
+
+	/* Record the condition variable on which we will sleep. */
+	cv_sleep_target = cv;
+
+	/* Mark myself as sleeping. */
+	MyProc->cvSleeping = true;
+
+	/* Add myself to the wait queue. */
+	SpinLockAcquire(&cv->mutex);
+	proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+	SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple.  We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag.  This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ */
+void
+ConditionVariableSleep(void)
+{
+	Assert(cv_sleep_target != NULL);
+
+	while (MyProc->cvSleeping)
+	{
+		CHECK_FOR_INTERRUPTS();
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+		ResetLatch(&MyProc->procLatch);
+	}
+
+	cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation.  We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+	ConditionVariable *cv = cv_sleep_target;
+
+	if (cv_sleep_target == NULL)
+		return;
+
+	SpinLockAcquire(&cv->mutex);
+	proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+	SpinLockRelease(&cv->mutex);
+
+	MyProc->cvSleeping = false;
+	cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+	PGPROC  *proc = NULL;
+
+	/* Remove the first process from the wakeup queue (if any). */
+	SpinLockAcquire(&cv->mutex);
+	if (!proclist_is_empty(&cv->wakeup))
+		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+	SpinLockRelease(&cv->mutex);
+
+	/* If we found someone sleeping, set their latch to wake them up. */
+	if (proc != NULL)
+	{
+		SetLatch(&proc->procLatch);
+		return true;
+	}
+
+	/* No sleeping processes. */
+	return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+	int		nwoken = 0;
+
+	/*
+	 * Let's just do this the dumbest way possible.  We could try to dequeue
+	 * all the sleepers at once to save spinlock cycles, but it's a bit hard
+	 * to get that right in the face of possible sleep cancellations, and
+	 * we don't want to loop holding the mutex.
+	 */
+	while (ConditionVariableSignal(cv))
+		++nwoken;
+
+	return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9a758bd..ec08091 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -42,6 +42,7 @@
 #include "postmaster/autovacuum.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "storage/condition_variable.h"
 #include "storage/standby.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -805,6 +806,9 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/* Cancel any pending condition variable sleep, too */
+	ConditionVariableCancelSleep();
+
 	/* Make sure active replication slots are released */
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
@@ -910,6 +914,9 @@ AuxiliaryProcKill(int code, Datum arg)
 	/* Release any LW locks I am holding (see notes above) */
 	LWLockReleaseAll();
 
+	/* Cancel any pending condition variable sleep, too */
+	ConditionVariableCancelSleep();
+
 	/*
 	 * Reset MyLatch to the process local one.  This is so that signal
 	 * handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 0000000..54b7fba
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ *	  Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true.  Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable.  In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+	slock_t		mutex;
+	proclist_head	wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable.  In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met.  If not, the process should then sleep.  If so,
+ * it should cancel the sleep.  A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again.  If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition.  Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(void);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif   /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f576f05..812008a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,10 @@ struct PGPROC
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
 	proclist_node lwWaitLink;	/* position in LW lock wait list */
 
+	/* Support for condition variables. */
+	bool		cvSleeping;		/* true if sleeping on a condition variable */
+	proclist_node	cvWaitLink;	/* position in CV wait list */
+
 	/* Info about lock the process is currently waiting for, if any. */
 	/* waitLock and waitProcLock are NULL if not currently waiting. */
 	LOCK	   *waitLock;		/* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a40..0d7935c 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -120,6 +120,20 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
 }
 
 /*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+	PGPROC *proc;
+
+	Assert(!proclist_is_empty(list));
+	proc = GetPGProcByNumber(list->head);
+	proclist_delete_offset(list, list->head, node_offset);
+	return proc;
+}
+
+/*
  * Helper macros to avoid repetition of offsetof(PGPROC, <member>).
  * 'link_member' is the name of a proclist_node member in PGPROC.
  */
@@ -129,6 +143,8 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
 	proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
 #define proclist_push_tail(list, procno, link_member) \
 	proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+	proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
 
 /*
  * Iterate through the list pointed at by 'lhead', storing the current
-- 
2.5.4 (Apple Git-61)

