Hi,
On 2014-09-10 14:53:07 -0400, Robert Haas wrote:
> As discussed on the thread "Spinlocks and compiler/memory barriers",
> now that we've made the spinlock primitives function as compiler
> barriers (we think), it should be possible to remove volatile
> qualifiers from many places in the source code. The attached patch
> does this in lwlock.c. If the changes in commit
> 0709b7ee72e4bc71ad07b7120acd117265ab51d0 (and follow-on commits) are
> correct and complete, applying this shouldn't break anything, while
> possibly giving the compiler room to optimize things better than it
> does today.
>
> However, demonstrating the necessity of that commit for these changes
> seems to be non-trivial. I tried applying this patch and reverting
> commits 5b26278822c69dd76ef89fd50ecc7cdba9c3f035,
> b4c28d1b92c81941e4fc124884e51a7c110316bf, and
> 0709b7ee72e4bc71ad07b7120acd117265ab51d0 on a PPC64 POWER8 box with a
> whopping 192 hardware threads (thanks, IBM!). I then ran the
> regression tests repeatedly, and I ran several long pgbench runs with
> as many as 350 concurrent clients. No failures.
There's actually one more commit to revert. What I used was:
git revert 5b26278822c69dd76ef89fd50ecc7cdba9c3f035 \
b4c28d1b92c81941e4fc124884e51a7c110316bf \
68e66923ff629c324e219090860dc9e0e0a6f5d6 \
0709b7ee72e4bc71ad07b7120acd117265ab51d0
> So I'm posting this patch in the hope that others can help. The
> relevant tests are:
>
> 1. If you apply this patch to master and run tests of whatever kind
> strikes your fancy, does anything break under high concurrency? If it
> does, then the above commits weren't enough to make this safe on your
> platform.
>
> 2. If you apply this patch to master, revert the commits mentioned
> above, and again run tests, does anything now break? If it does (but
> the first tests were OK), then that shows that those commits did
> something useful on your platform.
I just tried this on my normal x86 workstation. I applied your lwlock
patch and ontop I removed most volatiles (there's a couple still
required) from xlog.c. Works for 100 seconds. Then I reverted the above
commits. Breaks within seconds:
master:
LOG: request to flush past end of generated WAL; request 2/E5EC3DE0, currpos
2/E5EC1E60
standby:
LOG: record with incorrect prev-link 4/684C3108 at 4/684C3108
and similar.
So at least for x86 the compiler barriers are obviously required and
seemingly working.
I've attached the very quickly written xlog.c de-volatizing patch.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
>From 2b68a134925e4b2fb6ff282f5ed83b8f57b10732 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 17 Sep 2014 13:21:20 +0200
Subject: [PATCH] xlog.c-remove-volatile
---
src/backend/access/transam/xlog.c | 473 ++++++++++++++------------------------
1 file changed, 176 insertions(+), 297 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 34f2fc0..103f077 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1219,16 +1219,13 @@ begin:;
*/
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
- if (xlogctl->LogwrtRqst.Write < EndPos)
- xlogctl->LogwrtRqst.Write = EndPos;
+ if (XLogCtl->LogwrtRqst.Write < EndPos)
+ XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -1323,7 +1320,7 @@ static void
ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr)
{
- volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
@@ -1378,7 +1375,7 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
static bool
ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
{
- volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
@@ -1696,7 +1693,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
uint64 bytepos;
XLogRecPtr reservedUpto;
XLogRecPtr finishedUpto;
- volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
int i;
if (MyProc == NULL)
@@ -2131,16 +2128,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
break;
/* Before waiting, get info_lck and update LogwrtResult */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
- xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
+ XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* Now that we have an up-to-date LogwrtResult value, see if we
@@ -2548,16 +2540,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* code in a couple of places.
*/
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->LogwrtResult = LogwrtResult;
- if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
- xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
- if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
- xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->LogwrtResult = LogwrtResult;
+ if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
+ XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
+ if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
+ XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
+ SpinLockRelease(&XLogCtl->info_lck);
}
}
@@ -2572,15 +2561,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
XLogRecPtr WriteRqstPtr = asyncXactLSN;
bool sleeping;
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- LogwrtResult = xlogctl->LogwrtResult;
- sleeping = xlogctl->WalWriterSleeping;
- if (xlogctl->asyncXactLSN < asyncXactLSN)
- xlogctl->asyncXactLSN = asyncXactLSN;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LogwrtResult = XLogCtl->LogwrtResult;
+ sleeping = XLogCtl->WalWriterSleeping;
+ if (XLogCtl->asyncXactLSN < asyncXactLSN)
+ XLogCtl->asyncXactLSN = asyncXactLSN;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* If the WALWriter is sleeping, we should kick it to make it come out of
@@ -2613,12 +2599,9 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
void
XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->replicationSlotMinLSN = lsn;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->replicationSlotMinLSN = lsn;
+ SpinLockRelease(&XLogCtl->info_lck);
}
@@ -2629,13 +2612,11 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
static XLogRecPtr
XLogGetReplicationSlotMinimumLSN(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr retval;
- SpinLockAcquire(&xlogctl->info_lck);
- retval = xlogctl->replicationSlotMinLSN;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ retval = XLogCtl->replicationSlotMinLSN;
+ SpinLockRelease(&XLogCtl->info_lck);
return retval;
}
@@ -2671,8 +2652,6 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
updateMinRecoveryPoint = false;
else if (force || minRecoveryPoint < lsn)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr newMinRecoveryPoint;
TimeLineID newMinRecoveryPointTLI;
@@ -2689,10 +2668,10 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
* all. Instead, we just log a warning and continue with recovery.
* (See also the comments about corrupt LSNs in XLogFlush.)
*/
- SpinLockAcquire(&xlogctl->info_lck);
- newMinRecoveryPoint = xlogctl->replayEndRecPtr;
- newMinRecoveryPointTLI = xlogctl->replayEndTLI;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ newMinRecoveryPoint = XLogCtl->replayEndRecPtr;
+ newMinRecoveryPointTLI = XLogCtl->replayEndTLI;
+ SpinLockRelease(&XLogCtl->info_lck);
if (!force && newMinRecoveryPoint < lsn)
elog(WARNING,
@@ -2776,16 +2755,14 @@ XLogFlush(XLogRecPtr record)
*/
for (;;)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
- SpinLockAcquire(&xlogctl->info_lck);
- if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
- WriteRqstPtr = xlogctl->LogwrtRqst.Write;
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
+ WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease(&XLogCtl->info_lck);
/* done already? */
if (record <= LogwrtResult.Flush)
@@ -2922,15 +2899,10 @@ XLogBackgroundFlush(void)
return false;
/* read LogwrtResult and update local state */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- LogwrtResult = xlogctl->LogwrtResult;
- WriteRqstPtr = xlogctl->LogwrtRqst.Write;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LogwrtResult = XLogCtl->LogwrtResult;
+ WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+ SpinLockRelease(&XLogCtl->info_lck);
/* back off to last completed page boundary */
WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
@@ -2938,12 +2910,9 @@ XLogBackgroundFlush(void)
/* if we have already flushed that far, consider async commit records */
if (WriteRqstPtr <= LogwrtResult.Flush)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- WriteRqstPtr = xlogctl->asyncXactLSN;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ WriteRqstPtr = XLogCtl->asyncXactLSN;
+ SpinLockRelease(&XLogCtl->info_lck);
flexible = false; /* ensure it all gets written */
}
@@ -3054,14 +3023,9 @@ XLogNeedsFlush(XLogRecPtr record)
return false;
/* read LogwrtResult and update local state */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease(&XLogCtl->info_lck);
/* check again */
if (record <= LogwrtResult.Flush)
@@ -3683,13 +3647,11 @@ PreallocXlogFiles(XLogRecPtr endptr)
void
CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogSegNo lastRemovedSegNo;
- SpinLockAcquire(&xlogctl->info_lck);
- lastRemovedSegNo = xlogctl->lastRemovedSegNo;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
+ SpinLockRelease(&XLogCtl->info_lck);
if (segno <= lastRemovedSegNo)
{
@@ -3713,13 +3675,11 @@ CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
XLogSegNo
XLogGetLastRemovedSegno(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogSegNo lastRemovedSegNo;
- SpinLockAcquire(&xlogctl->info_lck);
- lastRemovedSegNo = xlogctl->lastRemovedSegNo;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
+ SpinLockRelease(&XLogCtl->info_lck);
return lastRemovedSegNo;
}
@@ -3731,17 +3691,15 @@ XLogGetLastRemovedSegno(void)
static void
UpdateLastRemovedPtr(char *filename)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
uint32 tli;
XLogSegNo segno;
XLogFromFileName(filename, &tli, &segno);
- SpinLockAcquire(&xlogctl->info_lck);
- if (segno > xlogctl->lastRemovedSegNo)
- xlogctl->lastRemovedSegNo = segno;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (segno > XLogCtl->lastRemovedSegNo)
+ XLogCtl->lastRemovedSegNo = segno;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -4699,13 +4657,10 @@ GetFakeLSNForUnloggedRel(void)
{
XLogRecPtr nextUnloggedLSN;
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
/* increment the unloggedLSN counter, need SpinLock */
- SpinLockAcquire(&xlogctl->ulsn_lck);
- nextUnloggedLSN = xlogctl->unloggedLSN++;
- SpinLockRelease(&xlogctl->ulsn_lck);
+ SpinLockAcquire(&XLogCtl->ulsn_lck);
+ nextUnloggedLSN = XLogCtl->unloggedLSN++;
+ SpinLockRelease(&XLogCtl->ulsn_lck);
return nextUnloggedLSN;
}
@@ -5737,13 +5692,11 @@ recoveryPausesHere(void)
bool
RecoveryIsPaused(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
bool recoveryPause;
- SpinLockAcquire(&xlogctl->info_lck);
- recoveryPause = xlogctl->recoveryPause;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recoveryPause = XLogCtl->recoveryPause;
+ SpinLockRelease(&XLogCtl->info_lck);
return recoveryPause;
}
@@ -5751,12 +5704,9 @@ RecoveryIsPaused(void)
void
SetRecoveryPause(bool recoveryPause)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->recoveryPause = recoveryPause;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->recoveryPause = recoveryPause;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -5854,12 +5804,9 @@ recoveryApplyDelay(XLogRecord *record)
static void
SetLatestXTime(TimestampTz xtime)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->recoveryLastXTime = xtime;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->recoveryLastXTime = xtime;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -5868,13 +5815,11 @@ SetLatestXTime(TimestampTz xtime)
TimestampTz
GetLatestXTime(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
TimestampTz xtime;
- SpinLockAcquire(&xlogctl->info_lck);
- xtime = xlogctl->recoveryLastXTime;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ xtime = XLogCtl->recoveryLastXTime;
+ SpinLockRelease(&XLogCtl->info_lck);
return xtime;
}
@@ -5888,12 +5833,9 @@ GetLatestXTime(void)
static void
SetCurrentChunkStartTime(TimestampTz xtime)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->currentChunkStartTime = xtime;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->currentChunkStartTime = xtime;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -5903,13 +5845,11 @@ SetCurrentChunkStartTime(TimestampTz xtime)
TimestampTz
GetCurrentChunkReplayStartTime(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
TimestampTz xtime;
- SpinLockAcquire(&xlogctl->info_lck);
- xtime = xlogctl->currentChunkStartTime;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ xtime = XLogCtl->currentChunkStartTime;
+ SpinLockRelease(&XLogCtl->info_lck);
return xtime;
}
@@ -6433,9 +6373,6 @@ StartupXLOG(void)
{
int rmid;
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
/*
* Update pg_control to show that we are recovering and to show the
* selected checkpoint as the place we are starting from. We also mark
@@ -6622,18 +6559,18 @@ StartupXLOG(void)
* if we had just replayed the record before the REDO location (or the
* checkpoint record itself, if it's a shutdown checkpoint).
*/
- SpinLockAcquire(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
if (checkPoint.redo < RecPtr)
- xlogctl->replayEndRecPtr = checkPoint.redo;
+ XLogCtl->replayEndRecPtr = checkPoint.redo;
else
- xlogctl->replayEndRecPtr = EndRecPtr;
- xlogctl->replayEndTLI = ThisTimeLineID;
- xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
- xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
- xlogctl->recoveryLastXTime = 0;
- xlogctl->currentChunkStartTime = 0;
- xlogctl->recoveryPause = false;
- SpinLockRelease(&xlogctl->info_lck);
+ XLogCtl->replayEndRecPtr = EndRecPtr;
+ XLogCtl->replayEndTLI = ThisTimeLineID;
+ XLogCtl->lastReplayedEndRecPtr = XLogCtl->replayEndRecPtr;
+ XLogCtl->lastReplayedTLI = XLogCtl->replayEndTLI;
+ XLogCtl->recoveryLastXTime = 0;
+ XLogCtl->currentChunkStartTime = 0;
+ XLogCtl->recoveryPause = false;
+ SpinLockRelease(&XLogCtl->info_lck);
/* Also ensure XLogReceiptTime has a sane value */
XLogReceiptTime = GetCurrentTimestamp();
@@ -6732,7 +6669,7 @@ StartupXLOG(void)
* otherwise would is a minor issue, so it doesn't seem worth
* adding another spinlock cycle to prevent that.
*/
- if (xlogctl->recoveryPause)
+ if (((volatile XLogCtlData*) XLogCtl)->recoveryPause)
recoveryPausesHere();
/*
@@ -6757,7 +6694,7 @@ StartupXLOG(void)
* here otherwise pausing during the delay-wait wouldn't
* work.
*/
- if (xlogctl->recoveryPause)
+ if (((volatile XLogCtlData*) XLogCtl)->recoveryPause)
recoveryPausesHere();
}
@@ -6830,10 +6767,10 @@ StartupXLOG(void)
* Update shared replayEndRecPtr before replaying this record,
* so that XLogFlush will update minRecoveryPoint correctly.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->replayEndRecPtr = EndRecPtr;
- xlogctl->replayEndTLI = ThisTimeLineID;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->replayEndRecPtr = EndRecPtr;
+ XLogCtl->replayEndTLI = ThisTimeLineID;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* If we are attempting to enter Hot Standby mode, process
@@ -6853,10 +6790,10 @@ StartupXLOG(void)
* Update lastReplayedEndRecPtr after this record has been
* successfully replayed.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->lastReplayedEndRecPtr = EndRecPtr;
- xlogctl->lastReplayedTLI = ThisTimeLineID;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
+ XLogCtl->lastReplayedTLI = ThisTimeLineID;
+ SpinLockRelease(&XLogCtl->info_lck);
/* Remember this record as the last-applied one */
LastRec = ReadRecPtr;
@@ -7266,14 +7203,9 @@ StartupXLOG(void)
* there are no race conditions concerning visibility of other recent
* updates to shared memory.)
*/
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->SharedRecoveryInProgress = false;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->SharedRecoveryInProgress = false;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* If there were cascading standby servers connected to us, nudge any wal
@@ -7376,12 +7308,9 @@ CheckRecoveryConsistency(void)
reachedConsistency &&
IsUnderPostmaster)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->SharedHotStandbyActive = true;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->SharedHotStandbyActive = true;
+ SpinLockRelease(&XLogCtl->info_lck);
LocalHotStandbyActive = true;
@@ -7466,13 +7395,10 @@ HotStandbyActive(void)
return true;
else
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
/* spinlock is essential on machines with weak memory ordering! */
- SpinLockAcquire(&xlogctl->info_lck);
- LocalHotStandbyActive = xlogctl->SharedHotStandbyActive;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LocalHotStandbyActive = XLogCtl->SharedHotStandbyActive;
+ SpinLockRelease(&XLogCtl->info_lck);
return LocalHotStandbyActive;
}
@@ -7687,8 +7613,6 @@ InitXLOGAccess(void)
XLogRecPtr
GetRedoRecPtr(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr ptr;
/*
@@ -7696,9 +7620,9 @@ GetRedoRecPtr(void)
* grabbed a WAL insertion lock to read the master copy, someone might
* update it just after we've released the lock.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- ptr = xlogctl->RedoRecPtr;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ ptr = XLogCtl->RedoRecPtr;
+ SpinLockRelease(&XLogCtl->info_lck);
if (RedoRecPtr < ptr)
RedoRecPtr = ptr;
@@ -7717,13 +7641,11 @@ GetRedoRecPtr(void)
XLogRecPtr
GetInsertRecPtr(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->LogwrtRqst.Write;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recptr = XLogCtl->LogwrtRqst.Write;
+ SpinLockRelease(&XLogCtl->info_lck);
return recptr;
}
@@ -7735,13 +7657,11 @@ GetInsertRecPtr(void)
XLogRecPtr
GetFlushRecPtr(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->LogwrtResult.Flush;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recptr = XLogCtl->LogwrtResult.Flush;
+ SpinLockRelease(&XLogCtl->info_lck);
return recptr;
}
@@ -7778,15 +7698,10 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
TransactionId nextXid;
/* Must read checkpoint info first, else have race condition */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- ckptXidEpoch = xlogctl->ckptXidEpoch;
- ckptXid = xlogctl->ckptXid;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ ckptXidEpoch = XLogCtl->ckptXidEpoch;
+ ckptXid = XLogCtl->ckptXid;
+ SpinLockRelease(&XLogCtl->info_lck);
/* Now fetch current nextXid */
nextXid = ReadNewTransactionId();
@@ -7989,8 +7904,6 @@ LogCheckpointEnd(bool restartpoint)
void
CreateCheckPoint(int flags)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
bool shutdown;
CheckPoint checkPoint;
XLogRecPtr recptr;
@@ -8150,7 +8063,7 @@ CreateCheckPoint(int flags)
* XLogInserts that happen while we are dumping buffers must assume that
* their buffer changes are not included in the checkpoint.
*/
- RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
+ RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
/*
* Now we can release the WAL insertion locks, allowing other xacts to
@@ -8159,9 +8072,9 @@ CreateCheckPoint(int flags)
WALInsertLockRelease();
/* Update the info_lck-protected copy of RedoRecPtr as well */
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->RedoRecPtr = checkPoint.redo;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->RedoRecPtr = checkPoint.redo;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* If enabled, log checkpoint start. We postpone this until now so as not
@@ -8333,15 +8246,10 @@ CreateCheckPoint(int flags)
LWLockRelease(ControlFileLock);
/* Update shared-memory copy of checkpoint XID/epoch */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
- xlogctl->ckptXid = checkPoint.nextXid;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+ XLogCtl->ckptXid = checkPoint.nextXid;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* We are now done with critical updates; no need for system panic if we
@@ -8496,9 +8404,6 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
static void
RecoveryRestartPoint(const CheckPoint *checkPoint)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
/*
* Also refrain from creating a restartpoint if we have seen any
* references to non-existent pages. Restarting recovery from the
@@ -8520,10 +8425,10 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
* Copy the checkpoint record to shared memory, so that checkpointer can
* work out the next time it wants to perform a restartpoint.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->lastCheckPointRecPtr = ReadRecPtr;
- xlogctl->lastCheckPoint = *checkPoint;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->lastCheckPointRecPtr = ReadRecPtr;
+ XLogCtl->lastCheckPoint = *checkPoint;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/*
@@ -8545,9 +8450,6 @@ CreateRestartPoint(int flags)
XLogSegNo _logSegNo;
TimestampTz xtime;
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
/*
* Acquire CheckpointLock to ensure only one restartpoint or checkpoint
* happens at a time.
@@ -8555,10 +8457,10 @@ CreateRestartPoint(int flags)
LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
/* Get a local copy of the last safe checkpoint record. */
- SpinLockAcquire(&xlogctl->info_lck);
- lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr;
- lastCheckPoint = xlogctl->lastCheckPoint;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ lastCheckPointRecPtr = XLogCtl->lastCheckPointRecPtr;
+ lastCheckPoint = XLogCtl->lastCheckPoint;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* Check that we're still in recovery mode. It's ok if we exit recovery
@@ -8615,15 +8517,18 @@ CreateRestartPoint(int flags)
* Like in CreateCheckPoint(), hold off insertions to update it, although
* during recovery this is just pro forma, because no WAL insertions are
* happening.
+ *
+ * NB: XXX: It's safe to access XLogCtl->Insert.RedoRecPtr here because
+ * that variable is protected by the wal insert lock, not info_lck.
*/
WALInsertLockAcquireExclusive();
- xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+ XLogCtl->Insert.RedoRecPtr = lastCheckPoint.redo;
WALInsertLockRelease();
/* Also update the info_lck-protected copy */
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->RedoRecPtr = lastCheckPoint.redo;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->RedoRecPtr = lastCheckPoint.redo;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* Prepare to accumulate statistics.
@@ -9383,15 +9288,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
/* Update shared-memory copy of checkpoint XID/epoch */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
- xlogctl->ckptXid = checkPoint.nextXid;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+ XLogCtl->ckptXid = checkPoint.nextXid;
+ SpinLockRelease(&XLogCtl->info_lck);
/*
* We should've already switched to the new TLI before replaying this
@@ -9435,15 +9335,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
/* Update shared-memory copy of checkpoint XID/epoch */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
- xlogctl->ckptXid = checkPoint.nextXid;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+ XLogCtl->ckptXid = checkPoint.nextXid;
+ SpinLockRelease(&XLogCtl->info_lck);
/* TLI should not change in an on-line checkpoint */
if (checkPoint.ThisTimeLineID != ThisTimeLineID)
@@ -9580,8 +9475,6 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
}
else if (info == XLOG_FPW_CHANGE)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
bool fpw;
memcpy(&fpw, XLogRecGetData(record), sizeof(bool));
@@ -9593,10 +9486,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
*/
if (!fpw)
{
- SpinLockAcquire(&xlogctl->info_lck);
- if (xlogctl->lastFpwDisableRecPtr < ReadRecPtr)
- xlogctl->lastFpwDisableRecPtr = ReadRecPtr;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (XLogCtl->lastFpwDisableRecPtr < ReadRecPtr)
+ XLogCtl->lastFpwDisableRecPtr = ReadRecPtr;
+ SpinLockRelease(&XLogCtl->info_lck);
}
/* Keep track of full_page_writes */
@@ -9951,8 +9844,6 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
if (backup_started_in_recovery)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
/*
@@ -9960,9 +9851,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
* (i.e., since last restartpoint used as backup starting
* checkpoint) contain full-page writes.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->lastFpwDisableRecPtr;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recptr = XLogCtl->lastFpwDisableRecPtr;
+ SpinLockRelease(&XLogCtl->info_lck);
if (!checkpointfpw || startpoint <= recptr)
ereport(ERROR,
@@ -10305,17 +10196,15 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
*/
if (backup_started_in_recovery)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
/*
* Check to see if all WAL replayed during online backup contain
* full-page writes.
*/
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->lastFpwDisableRecPtr;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recptr = XLogCtl->lastFpwDisableRecPtr;
+ SpinLockRelease(&XLogCtl->info_lck);
if (startpoint <= recptr)
ereport(ERROR,
@@ -10502,15 +10391,13 @@ do_pg_abort_backup(void)
XLogRecPtr
GetXLogReplayRecPtr(TimeLineID *replayTLI)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
TimeLineID tli;
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->lastReplayedEndRecPtr;
- tli = xlogctl->lastReplayedTLI;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ recptr = XLogCtl->lastReplayedEndRecPtr;
+ tli = XLogCtl->lastReplayedTLI;
+ SpinLockRelease(&XLogCtl->info_lck);
if (replayTLI)
*replayTLI = tli;
@@ -10523,7 +10410,7 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
XLogRecPtr
GetXLogInsertRecPtr(void)
{
- volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 current_bytepos;
SpinLockAcquire(&Insert->insertpos_lck);
@@ -10539,14 +10426,9 @@ GetXLogInsertRecPtr(void)
XLogRecPtr
GetXLogWriteRecPtr(void)
{
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Write;
}
@@ -11372,10 +11254,7 @@ WakeupRecovery(void)
void
SetWalWriterSleeping(bool sleeping)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->WalWriterSleeping = sleeping;
- SpinLockRelease(&xlogctl->info_lck);
+ SpinLockAcquire(&XLogCtl->info_lck);
+ XLogCtl->WalWriterSleeping = sleeping;
+ SpinLockRelease(&XLogCtl->info_lck);
}
--
2.0.0.rc2.4.g1dc51c6.dirty
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers