From 923f55423200d3034380068ff1357af32deedf8f Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 11 Aug 2016 17:11:34 -0400
Subject: [PATCH 3/4] Replace buffer I/O locks with condition variables.

---
 src/backend/storage/buffer/buf_init.c    | 25 +++++---------
 src/backend/storage/buffer/bufmgr.c      | 59 ++++++++------------------------
 src/include/storage/buf_internals.h      |  7 ++--
 src/include/storage/condition_variable.h | 11 ++++++
 4 files changed, 38 insertions(+), 64 deletions(-)

diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index a4163cf..37ebd37 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -20,7 +20,7 @@
 
 BufferDescPadded *BufferDescriptors;
 char	   *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
 LWLockTranche BufferIOLWLockTranche;
 LWLockTranche BufferContentLWLockTranche;
 WritebackContext BackendWritebackContext;
@@ -71,7 +71,7 @@ InitBufferPool(void)
 {
 	bool		foundBufs,
 				foundDescs,
-				foundIOLocks,
+				foundIOCV,
 				foundBufCkpt;
 
 	/* Align descriptors to a cacheline boundary. */
@@ -85,16 +85,10 @@ InitBufferPool(void)
 						NBuffers * (Size) BLCKSZ, &foundBufs);
 
 	/* Align lwlocks to cacheline boundary */
-	BufferIOLWLockArray = (LWLockMinimallyPadded *)
-		ShmemInitStruct("Buffer IO Locks",
-						NBuffers * (Size) sizeof(LWLockMinimallyPadded),
-						&foundIOLocks);
-
-	BufferIOLWLockTranche.name = "buffer_io";
-	BufferIOLWLockTranche.array_base = BufferIOLWLockArray;
-	BufferIOLWLockTranche.array_stride = sizeof(LWLockMinimallyPadded);
-	LWLockRegisterTranche(LWTRANCHE_BUFFER_IO_IN_PROGRESS,
-						  &BufferIOLWLockTranche);
+	BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+		ShmemInitStruct("Buffer IO Condition Variables",
+			  NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+									   &foundIOCV);
 
 	BufferContentLWLockTranche.name = "buffer_content";
 	BufferContentLWLockTranche.array_base =
@@ -114,10 +108,10 @@ InitBufferPool(void)
 		ShmemInitStruct("Checkpoint BufferIds",
 						NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
 
-	if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+	if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
 	{
 		/* should find all of these, or none of them */
-		Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+		Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
 		/* note: this path is only taken in EXEC_BACKEND case */
 	}
 	else
@@ -147,8 +141,7 @@ InitBufferPool(void)
 			LWLockInitialize(BufferDescriptorGetContentLock(buf),
 							 LWTRANCHE_BUFFER_CONTENT);
 
-			LWLockInitialize(BufferDescriptorGetIOLock(buf),
-							 LWTRANCHE_BUFFER_IO_IN_PROGRESS);
+			ConditionVariableInit(BufferDescriptorGetIOCV(buf));
 		}
 
 		/* Correct last entry of linked list */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 76ade37..403db6e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1314,8 +1314,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to get the io_in_progress
-	 * lock.  If StartBufferIO returns false, then someone else managed to
+	 * Buffer contents are currently invalid.  Try to obtain the right to start
+	 * I/O.  If StartBufferIO returns false, then someone else managed to
 	 * read it before we did, so there's nothing left for BufferAlloc() to do.
 	 */
 	if (StartBufferIO(buf, true))
@@ -1693,9 +1693,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
 		uint32		buf_state;
 		uint32		old_buf_state;
 
-		/* I'd better not still hold any locks on the buffer */
+		/* I'd better not still hold the buffer content lock */
 		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
-		Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
 
 		/*
 		 * Decrement the shared reference count.
@@ -2656,9 +2655,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 	uint32		buf_state;
 
 	/*
-	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
-	 * false, then someone else flushed the buffer before we could, so we need
-	 * not do anything.
+	 * Try to start an I/O operation.  If StartBufferIO returns false, then
+	 * someone else flushed the buffer before we could, so we need not do
+	 * anything.
 	 */
 	if (!StartBufferIO(buf, false))
 		return;
@@ -2714,7 +2713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 	/*
 	 * Now it's safe to write buffer to disk. Note that no one else should
 	 * have been able to write it while we were busy with log flushing because
-	 * we have the io_in_progress lock.
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
 	 */
 	bufBlock = BufHdrGetBlock(buf);
 
@@ -2749,7 +2748,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 
 	/*
 	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
-	 * end the io_in_progress state.
+	 * end the BM_IO_IN_PROGRESS state.
 	 */
 	TerminateBufferIO(buf, true, 0);
 
@@ -3755,7 +3754,7 @@ ConditionalLockBufferForCleanup(Buffer buffer)
  *	Functions for buffer I/O handling
  *
  *	Note: We assume that nested buffer I/O never occurs.
- *	i.e at most one io_in_progress lock is held per proc.
+ *	i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
  *
  *	Also note that these are used only for shared buffers, not local ones.
  */
@@ -3766,13 +3765,6 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 static void
 WaitIO(BufferDesc *buf)
 {
-	/*
-	 * Changed to wait until there's no IO - Inoue 01/13/2000
-	 *
-	 * Note this is *necessary* because an error abort in the process doing
-	 * I/O could release the io_in_progress_lock prematurely. See
-	 * AbortBufferIO.
-	 */
 	for (;;)
 	{
 		uint32		buf_state;
@@ -3782,14 +3774,15 @@ WaitIO(BufferDesc *buf)
 		 * here, but since this test is essential for correctness, we'd better
 		 * play it safe.
 		 */
+		ConditionVariablePrepareToSleep(BufferDescriptorGetIOCV(buf));
 		buf_state = LockBufHdr(buf);
 		UnlockBufHdr(buf, buf_state);
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
-		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
-		LWLockRelease(BufferDescriptorGetIOLock(buf));
+		ConditionVariableSleep();
 	}
+	ConditionVariableCancelSleep();
 }
 
 /*
@@ -3801,7 +3794,7 @@ WaitIO(BufferDesc *buf)
  * In some scenarios there are race conditions in which multiple backends
  * could attempt the same I/O operation concurrently.  If someone else
  * has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
  *
  * Input operations are only attempted on buffers that are not BM_VALID,
  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -3819,25 +3812,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 
 	for (;;)
 	{
-		/*
-		 * Grab the io_in_progress lock so that other processes can wait for
-		 * me to finish the I/O.
-		 */
-		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
 		buf_state = LockBufHdr(buf);
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
-
-		/*
-		 * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
-		 * lock isn't held is if the process doing the I/O is recovering from
-		 * an error (see AbortBufferIO).  If that's the case, we must wait for
-		 * him to get unwedged.
-		 */
 		UnlockBufHdr(buf, buf_state);
-		LWLockRelease(BufferDescriptorGetIOLock(buf));
 		WaitIO(buf);
 	}
 
@@ -3847,7 +3826,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 	{
 		/* someone else already did the I/O */
 		UnlockBufHdr(buf, buf_state);
-		LWLockRelease(BufferDescriptorGetIOLock(buf));
 		return false;
 	}
 
@@ -3865,7 +3843,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
  *	(Assumptions)
  *	My process is executing IO for the buffer
  *	BM_IO_IN_PROGRESS bit is set for the buffer
- *	We hold the buffer's io_in_progress lock
  *	The buffer is Pinned
  *
  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
@@ -3897,7 +3874,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 
 	InProgressBuf = NULL;
 
-	LWLockRelease(BufferDescriptorGetIOLock(buf));
+	ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
@@ -3918,14 +3895,6 @@ AbortBufferIO(void)
 	{
 		uint32		buf_state;
 
-		/*
-		 * Since LWLockReleaseAll has already been called, we're not holding
-		 * the buffer's io_in_progress_lock. We have to re-acquire it so that
-		 * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
-		 * buffer will be in a busy spin until we succeed in doing this.
-		 */
-		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
 		buf_state = LockBufHdr(buf);
 		Assert(buf_state & BM_IO_IN_PROGRESS);
 		if (IsForInput)
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index e0dfb2f..18ef400 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -17,6 +17,7 @@
 
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
 
 #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
 
-#define BufferDescriptorGetIOLock(bdesc) \
-	(&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+	(&(BufferIOCVArray[(bdesc)->buf_id]).cv)
 #define BufferDescriptorGetContentLock(bdesc) \
 	((LWLock*) (&(bdesc)->content_lock))
 
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
 
 /*
  * The freeNext field is either the index of the next freelist entry,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 54b7fba..38626d5 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -31,6 +31,17 @@ typedef struct
 	proclist_head	wakeup;
 } ConditionVariable;
 
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE		(sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+	ConditionVariable	cv;
+	char		pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *);
 
-- 
2.5.4 (Apple Git-61)

