On Fri, Aug 22, 2014 at 7:14 PM, Rajeev rastogi <[email protected]>
wrote:
> I have just started looking into this patch.
> Please find below my first level of observation from the patch:
>
Thanks! Updated patch attached.
> 1. Allocation of memory for sync_nodes in function
> SyncRepGetSynchronousNodes should be equivalent to allowed_sync_nodes
> instead of max_wal_senders. As anyway we are not going to store sync stdbys
> more than allowed_sync_nodes.
> sync_nodes = (int *) palloc(allowed_sync_nodes *
> sizeof(int));
>
Fixed.
2. Logic of deciding the highest priority one seems to be in-correct.
> Assume, s_s_num = 3, s_s_names = 3,4,2,1
> standby nodes are in order as: 1,2,3,4,5,6,7
>
> As per the logic in patch, node 4 with priority 2 will not be
> added in the list whereas 1,2,3 will be added.
>
> The problem is because priority updated for next tracking is not
> the highest priority as of that iteration, it is just priority of last node
> added to the list. So it may happen that a node with higher priority
> is still there in list but we are comparing with some other smaller
> priority.
>
Fixed. Nice catch!
> 3. Can we optimize the function SyncRepGetSynchronousNodes in such a way
> that it gets the number of standby nodes from s_s_names itself. With this
> it will be usful to stop scanning the moment we get first s_s_num potential
> standbys.
>
By doing so, we would need to scan the WAL sender array more than once (or
once if we can find N sync nodes with a name matching the first entry, smth
unlikely to happen). We would need as well to recalculate for a given item
in the list _names what is its priority and compare it with the existing
entries in the WAL sender list. So this is not worth the shot.
Also, using the priority instead of s_s_names is more solid as s_s_names is
now used only in SyncRepGetStandbyPriority to calculate the priority for a
given WAL sender, and is a function only called by a WAL sender itself when
it initializes.
Regards,
--
Michael
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2586,2597 **** include_dir 'conf.d'
Specifies a comma-separated list of standby names that can support
<firstterm>synchronous replication</>, as described in
<xref linkend="synchronous-replication">.
! At any one time there will be at most one active synchronous standby;
! transactions waiting for commit will be allowed to proceed after
! this standby server confirms receipt of their data.
! The synchronous standby will be the first standby named in this list
! that is both currently connected and streaming data in real-time
! (as shown by a state of <literal>streaming</literal> in the
<link linkend="monitoring-stats-views-table">
<literal>pg_stat_replication</></link> view).
Other standby servers appearing later in this list represent potential
--- 2586,2598 ----
Specifies a comma-separated list of standby names that can support
<firstterm>synchronous replication</>, as described in
<xref linkend="synchronous-replication">.
! At any one time there will be at a number of active synchronous standbys
! defined by <xref linkend="guc-synchronous-standby-num">, transactions
! waiting for commit will be allowed to proceed after those standby
! servers confirm receipt of their data. The synchronous standbys will be
! the first entries named in this list that are both currently connected
! and streaming data in real-time (as shown by a state of
! <literal>streaming</literal> in the
<link linkend="monitoring-stats-views-table">
<literal>pg_stat_replication</></link> view).
Other standby servers appearing later in this list represent potential
***************
*** 2627,2632 **** include_dir 'conf.d'
--- 2628,2685 ----
</listitem>
</varlistentry>
+ <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num">
+ <term><varname>synchronous_standby_num</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>synchronous_standby_num</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies the number of standbys that support
+ <firstterm>synchronous replication</>.
+ </para>
+ <para>
+ Default value is <literal>-1</>. In this case, if
+ <xref linkend="guc-synchronous-standby-names"> is empty all the
+ standby nodes are considered asynchronous. If there is at least
+ one node name defined, process will wait for one synchronous
+ standby listed.
+ </para>
+ <para>
+ When this parameter is set to <literal>0</>, all the standby
+ nodes will be considered as asynchronous.
+ </para>
+ <para>
+ This parameter value cannot be higher than
+ <xref linkend="guc-max-wal-senders">.
+ </para>
+ <para>
+ Are considered as synchronous the first elements of
+ <xref linkend="guc-synchronous-standby-names"> in number of
+ <xref linkend="guc-synchronous-standby-num"> that are
+ connected. If there are more elements than the number of stansbys
+ required, all the additional standbys are potential synchronous
+ candidates. If <xref linkend="guc-synchronous-standby-names"> is
+ empty, all the standbys are asynchronous. If it is set to the
+ special entry <literal>*</>, a number of standbys equal to
+ <xref linkend="guc-synchronous-standby-names"> with the highest
+ pritority are elected as being synchronous.
+ </para>
+ <para>
+ Server will wait for commit confirmation from
+ <xref linkend="guc-synchronous-standby-num"> standbys, meaning that
+ if <xref linkend="guc-synchronous-standby-names"> has less elements
+ than the number of standbys required, server will wait indefinitely
+ for a commit confirmation.
+ </para>
+ <para>
+ This parameter can only be set in the <filename>postgresql.conf</>
+ file or on the server command line.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
<term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)
<indexterm>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1081,1092 **** primary_slot_name = 'node_a_slot'
WAL record is then sent to the standby. The standby sends reply
messages each time a new batch of WAL data is written to disk, unless
<varname>wal_receiver_status_interval</> is set to zero on the standby.
! If the standby is the first matching standby, as specified in
! <varname>synchronous_standby_names</> on the primary, the reply
! messages from that standby will be used to wake users waiting for
! confirmation that the commit record has been received. These parameters
! allow the administrator to specify which standby servers should be
! synchronous standbys. Note that the configuration of synchronous
replication is mainly on the master. Named standbys must be directly
connected to the master; the master knows nothing about downstream
standby servers using cascaded replication.
--- 1081,1092 ----
WAL record is then sent to the standby. The standby sends reply
messages each time a new batch of WAL data is written to disk, unless
<varname>wal_receiver_status_interval</> is set to zero on the standby.
! If the standby is the first <varname>synchronous_standby_num</> matching
! standbys, as specified in <varname>synchronous_standby_names</> on the
! primary, the reply messages from that standby will be used to wake users
! waiting for confirmation that the commit record has been received. These
! parameters allow the administrator to specify which standby servers should
! be synchronous standbys. Note that the configuration of synchronous
replication is mainly on the master. Named standbys must be directly
connected to the master; the master knows nothing about downstream
standby servers using cascaded replication.
***************
*** 1167,1177 **** primary_slot_name = 'node_a_slot'
<para>
The best solution for avoiding data loss is to ensure you don't lose
! your last remaining synchronous standby. This can be achieved by naming multiple
potential synchronous standbys using <varname>synchronous_standby_names</>.
! The first named standby will be used as the synchronous standby. Standbys
! listed after this will take over the role of synchronous standby if the
! first one should fail.
</para>
<para>
--- 1167,1177 ----
<para>
The best solution for avoiding data loss is to ensure you don't lose
! your last remaining synchronous standbys. This can be achieved by naming multiple
potential synchronous standbys using <varname>synchronous_standby_names</>.
! The first <varname>synchronous_standby_num</> named standbys will be used as
! the synchronous standbys. Standbys listed after this will take over the role
! of synchronous standby if the first one should fail.
</para>
<para>
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 5,11 ****
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
! * acknowledged by the sync standby.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
--- 5,11 ----
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
! * acknowledged by the synchronous standbys.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
***************
*** 29,39 ****
* single ordered queue of waiting backends, so that we can avoid
* searching the through all waiters each time we receive a reply.
*
! * In 9.1 we support only a single synchronous standby, chosen from a
! * priority list of synchronous_standby_names. Before it can become the
! * synchronous standby it must have caught up with the primary; that may
! * take some time. Once caught up, the current highest priority standby
! * will release waiters from the queue.
*
* Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
*
--- 29,50 ----
* single ordered queue of waiting backends, so that we can avoid
* searching the through all waiters each time we receive a reply.
*
! * In 9.4 we support the possibility to have multiple synchronous standbys,
! * whose number is defined by synchronous_standby_num, chosen from a
! * priority list of synchronous_standby_names. Before one standby can
! * become a synchronous standby it must have caught up with the primary;
! * that may take some time.
! *
! * Waiters will be released from the queue once the number of standbys
! * defined by synchronous_standby_num have caught.
! *
! * There are special cases though. If synchronous_standby_num is set to 0,
! * all the nodes are considered as asynchronous and fastpath is out to
! * leave this portion of the code as soon as possible. If it is set to
! * -1, process will wait for one node to catch up with the primary only
! * if synchronous_standby_names is non-empty. This is compatible with
! * what has been defined in 9.1 as -1 is the default value of
! * synchronous_standby_num.
*
* Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
*
***************
*** 59,67 ****
/* User-settable parameters for sync rep */
char *SyncRepStandbyNames;
#define SyncStandbysDefined() \
! (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
static bool announce_next_takeover = true;
--- 70,87 ----
/* User-settable parameters for sync rep */
char *SyncRepStandbyNames;
+ int synchronous_standby_num = -1;
+ /*
+ * Synchronous standbys are defined if there is more than
+ * one synchronous standby wanted. In default case, the list
+ * of standbys defined needs to be not empty.
+ */
#define SyncStandbysDefined() \
! (synchronous_standby_num > 0 || \
! (synchronous_standby_num == -1 && \
! SyncRepStandbyNames != NULL && \
! SyncRepStandbyNames[0] != '\0'))
static bool announce_next_takeover = true;
***************
*** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
ereport(WARNING,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
--- 226,232 ----
ereport(WARNING,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
***************
*** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
QueryCancelPending = false;
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to user request"),
! errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
SyncRepCancelWait();
break;
}
--- 243,249 ----
QueryCancelPending = false;
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to user request"),
! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
SyncRepCancelWait();
break;
}
***************
*** 357,365 **** SyncRepInitConfig(void)
}
}
/*
* Update the LSNs on each queue based upon our latest state. This
! * implements a simple policy of first-valid-standby-releases-waiter.
*
* Other policies are possible, which would change what we do here and what
* perhaps also which information we store as well.
--- 377,483 ----
}
}
+
+ /*
+ * Obtain a palloc'd array containing positions of standbys currently
+ * considered as synchronous. Caller is responsible for freeing the
+ * data obtained and should as well take a necessary lock on SyncRepLock.
+ */
+ int *
+ SyncRepGetSynchronousNodes(int *num_sync)
+ {
+ int *sync_nodes;
+ int priority = 0;
+ int i;
+ int allowed_sync_nodes = synchronous_standby_num;
+
+ /* Initialize */
+ *num_sync = 0;
+
+ /* Leave if no synchronous nodes allowed */
+ if (synchronous_standby_num == 0)
+ return NULL;
+
+ /*
+ * Determine the number of nodes that can be synchronized.
+ * synchronous_standby_num can have the special value -1,
+ * meaning that only one node with the highest non-null priority
+ * can be considered as synchronous.
+ */
+ if (synchronous_standby_num == -1)
+ allowed_sync_nodes = 1;
+
+ /*
+ * Make enough room, there is a maximum of max_wal_senders synchronous
+ * nodes as we scan though WAL senders here.
+ */
+ sync_nodes = (int *) palloc(allowed_sync_nodes * sizeof(int));
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* Use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ /* Process to next if not active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Process to next if not streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Process to next one if asynchronous */
+ if (walsnd->sync_standby_priority == 0)
+ continue;
+
+ /* Process to next one if priority conditions not satisfied */
+ if (priority != 0 &&
+ priority <= walsnd->sync_standby_priority &&
+ *num_sync == allowed_sync_nodes)
+ continue;
+
+ /* Process to next one if flush position is invalid */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ /*
+ * We have a potential synchronous candidate, add it to the
+ * list of nodes already present or evict the node with highest
+ * priority found until now.
+ */
+ if (*num_sync == allowed_sync_nodes)
+ {
+ int j;
+ for (j = 0; j < *num_sync; j++)
+ {
+ volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]];
+ if (walsndloc->sync_standby_priority == priority)
+ {
+ sync_nodes[j] = i;
+ break;
+ }
+ }
+ }
+ else
+ {
+ sync_nodes[*num_sync] = i;
+ (*num_sync)++;
+ }
+
+ /*
+ * Update priority for next tracking. This needs to be the highest
+ * priority value in all the existing items.
+ */
+ if (priority < walsnd->sync_standby_priority)
+ priority = walsnd->sync_standby_priority;
+ }
+
+ return sync_nodes;
+ }
+
/*
* Update the LSNs on each queue based upon our latest state. This
! * implements a simple policy of first-valid-standbys-release-waiter.
*
* Other policies are possible, which would change what we do here and what
* perhaps also which information we store as well.
***************
*** 368,378 **** void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
! volatile WalSnd *syncWalSnd = NULL;
int numwrite = 0;
int numflush = 0;
! int priority = 0;
int i;
/*
* If this WALSender is serving a standby that is not on the list of
--- 486,499 ----
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
! int *sync_standbys;
int numwrite = 0;
int numflush = 0;
! int num_sync = 0;
int i;
+ bool found = false;
+ XLogRecPtr min_write_pos;
+ XLogRecPtr min_flush_pos;
/*
* If this WALSender is serving a standby that is not on the list of
***************
*** 388,454 **** SyncRepReleaseWaiters(void)
/*
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
! * then we use the first mentioned standby. If you change this, also
! * change pg_stat_get_wal_senders().
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
! for (i = 0; i < max_wal_senders; i++)
{
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &walsndctl->walsnds[i];
!
! if (walsnd->pid != 0 &&
! walsnd->state == WALSNDSTATE_STREAMING &&
! walsnd->sync_standby_priority > 0 &&
! (priority == 0 ||
! priority > walsnd->sync_standby_priority) &&
! !XLogRecPtrIsInvalid(walsnd->flush))
{
! priority = walsnd->sync_standby_priority;
! syncWalSnd = walsnd;
}
}
/*
! * We should have found ourselves at least.
*/
! Assert(syncWalSnd);
/*
! * If we aren't managing the highest priority standby then just leave.
*/
! if (syncWalSnd != MyWalSnd)
{
LWLockRelease(SyncRepLock);
! announce_next_takeover = true;
return;
}
/*
* Set the lsn first so that when we wake backends they will release up to
! * this location.
*/
! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
{
! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
{
! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
! numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
/*
* If we are managing the highest priority standby, though we weren't
! * prior to this, then announce we are now the sync standby.
*/
if (announce_next_takeover)
{
--- 509,607 ----
/*
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
! * then we use the first mentioned standbys.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
! /*
! * We should have found ourselves at least, except if it is not expected
! * to find any synchronous nodes.
! */
! Assert(num_sync > 0);
!
! /*
! * If we aren't managing one of the standbys with highest priority
! * then just leave.
! */
! for (i = 0; i < num_sync; i++)
{
! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
! if (walsndloc == MyWalSnd)
{
! found = true;
! break;
}
}
/*
! * We are definitely not one of the chosen... But we could by
! * taking the next takeover.
*/
! if (!found)
! {
! LWLockRelease(SyncRepLock);
! pfree(sync_standbys);
! announce_next_takeover = true;
! return;
! }
/*
! * Even if we are one of the chosen standbys, leave if there
! * are less synchronous standbys in waiting state than what is
! * expected by the user.
*/
! if (num_sync < synchronous_standby_num &&
! synchronous_standby_num != -1)
{
LWLockRelease(SyncRepLock);
! pfree(sync_standbys);
return;
}
/*
* Set the lsn first so that when we wake backends they will release up to
! * this location, of course only if all the standbys found as synchronous
! * have already reached that point, so first find what are the oldest
! * write and flush positions of all the standbys considered in sync...
*/
! min_write_pos = MyWalSnd->write;
! min_flush_pos = MyWalSnd->flush;
! for (i = 0; i < num_sync; i++)
! {
! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
!
! SpinLockAcquire(&walsndloc->mutex);
! if (min_write_pos > walsndloc->write)
! min_write_pos = walsndloc->write;
! if (min_flush_pos > walsndloc->flush)
! min_flush_pos = walsndloc->flush;
! SpinLockRelease(&walsndloc->mutex);
! }
!
! /* ... And now update if necessary */
! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos)
{
! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos)
{
! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32),
! (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE],
! numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32),
! (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]);
/*
* If we are managing the highest priority standby, though we weren't
! * prior to this, then announce we are now a sync standby.
*/
if (announce_next_takeover)
{
***************
*** 457,462 **** SyncRepReleaseWaiters(void)
--- 610,618 ----
(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
application_name, MyWalSnd->sync_standby_priority)));
}
+
+ /* Clean up */
+ pfree(sync_standbys);
}
/*
***************
*** 483,488 **** SyncRepGetStandbyPriority(void)
--- 639,648 ----
if (am_cascading_walsender)
return 0;
+ /* If no synchronous nodes allowed, no cake for this WAL sender */
+ if (synchronous_standby_num == 0)
+ return 0;
+
/* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int *sync_priority;
! int priority = 0;
! int sync_standby = -1;
int i;
/* check to see if caller supports us returning a tuplestore */
--- 2735,2742 ----
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int *sync_priority;
! int *sync_standbys;
! int num_sync = 0;
int i;
/* check to see if caller supports us returning a tuplestore */
***************
*** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
/*
* Get the priorities of sync standbys all in one go, to minimise lock
* acquisitions and to allow us to evaluate who is the current sync
! * standby. This code must match the code in SyncRepReleaseWaiters().
*/
sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED);
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! if (walsnd->pid != 0)
! {
! /*
! * Treat a standby such as a pg_basebackup background process
! * which always returns an invalid flush location, as an
! * asynchronous standby.
! */
! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 0 : walsnd->sync_standby_priority;
!
! if (walsnd->state == WALSNDSTATE_STREAMING &&
! walsnd->sync_standby_priority > 0 &&
! (priority == 0 ||
! priority > walsnd->sync_standby_priority) &&
! !XLogRecPtrIsInvalid(walsnd->flush))
! {
! priority = walsnd->sync_standby_priority;
! sync_standby = i;
! }
! }
}
LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++)
--- 2767,2789 ----
/*
* Get the priorities of sync standbys all in one go, to minimise lock
* acquisitions and to allow us to evaluate who is the current sync
! * standby.
*/
sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED);
+
+ /* Get first the priorities on each standby as long as we hold a lock */
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 0 : walsnd->sync_standby_priority;
}
+
+ /* Obtain list of synchronous standbys */
+ sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++)
***************
*** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
if (sync_priority[i] == 0)
values[7] = CStringGetTextDatum("async");
- else if (i == sync_standby)
- values[7] = CStringGetTextDatum("sync");
else
! values[7] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
pfree(sync_priority);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
--- 2845,2876 ----
*/
if (sync_priority[i] == 0)
values[7] = CStringGetTextDatum("async");
else
! {
! int j;
! bool found = false;
!
! for (j = 0; j < num_sync; j++)
! {
! /* Found that this node is one in sync */
! if (i == sync_standbys[j])
! {
! values[7] = CStringGetTextDatum("sync");
! found = true;
! break;
! }
! }
! if (!found)
! values[7] = CStringGetTextDatum("potential");
! }
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+
+ /* Cleanup */
pfree(sync_priority);
+ pfree(sync_standbys);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2548,2553 **** static struct config_int ConfigureNamesInt[] =
--- 2548,2563 ----
NULL, NULL, NULL
},
+ {
+ {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+ gettext_noop("Number of potential synchronous standbys."),
+ NULL
+ },
+ &synchronous_standby_num,
+ -1, -1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ----
#synchronous_standby_names = '' # standby servers that provide sync rep
# comma-separated list of application_name
# from standby(s); '*' = all
+ #synchronous_standby_num = -1 # number of standbys servers using sync rep
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
# - Standby Servers -
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 33,38 ****
--- 33,39 ----
/* user-settable parameters for synchronous replication */
extern char *SyncRepStandbyNames;
+ extern int synchronous_standby_num;
/* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
***************
*** 49,54 **** extern void SyncRepUpdateSyncStandbysDefined(void);
--- 50,56 ----
/* called by various procs */
extern int SyncRepWakeQueue(bool all, int mode);
+ extern int *SyncRepGetSynchronousNodes(int *num_sync);
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
extern void assign_synchronous_commit(int newval, void *extra);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers