At Wed, 13 Sep 2017 11:43:06 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI
<[email protected]> wrote in
<[email protected]>
horiguchi.kyotaro> At Thu, 07 Sep 2017 21:59:56 +0900 (Tokyo Standard Time),
Kyotaro HORIGUCHI <[email protected]> wrote in
<[email protected]>
> > Hello,
> >
> > At Thu, 07 Sep 2017 14:12:12 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI
> > <[email protected]> wrote in
> > <[email protected]>
> > > > I would like a flag in pg_replication_slots, and possibly also a
> > > > numerical column that indicates how far away from the critical point
> > > > each slot is. That would be great for a monitoring system.
> > >
> > > Great! I'll do that right now.
> >
> > Done.
>
> The CF status of this patch turned into "Waiting on Author".
> This is because the second patch is posted separately from the
> first patch. I repost them together after rebasing to the current
> master.
Hmm. I was unconsciously careless of regression test since it is
in a tentative shape. This must pass the regression..
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 105,110 **** int wal_level = WAL_LEVEL_MINIMAL;
--- 105,111 ----
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
+ int max_slot_wal_keep_size_mb = 0;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
***************
*** 9365,9373 **** KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
--- 9366,9397 ----
if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
{
XLogSegNo slotSegNo;
+ int slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb);
XLByteToSeg(keep, slotSegNo);
+ /*
+ * ignore slots if too many wal segments are kept.
+ * max_slot_wal_keep_size is just accumulated on wal_keep_segments.
+ */
+ if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno)
+ {
+ segno = segno - slotlimitsegs; /* must be positive */
+
+ /*
+ * warn only if the checkpoint flushes the required segment.
+ * we assume here that *logSegNo is calculated keep location.
+ */
+ if (slotSegNo < *logSegNo)
+ ereport(WARNING,
+ (errmsg ("restart LSN of replication slots is ignored by checkpoint"),
+ errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld segments.",
+ (segno < *logSegNo ? segno : *logSegNo) - slotSegNo)));
+
+ /* emergency vent */
+ slotSegNo = segno;
+ }
+
if (slotSegNo <= 0)
segno = 1;
else if (slotSegNo < segno)
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2371,2376 **** static struct config_int ConfigureNamesInt[] =
--- 2371,2387 ----
},
{
+ {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+ gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+ NULL,
+ GUC_UNIT_MB
+ },
+ &max_slot_wal_keep_size_mb,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),
NULL,
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ----
#max_wal_senders = 10 # max number of walsender processes
# (change requires restart)
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
+ #max_slot_wal_keep_size = 0 # measured in bytes; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
#max_replication_slots = 10 # max number of replication slots
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 97,102 **** extern bool reachedConsistency;
--- 97,103 ----
extern int min_wal_size_mb;
extern int max_wal_size_mb;
extern int wal_keep_segments;
+ extern int max_slot_wal_keep_size_mb;
extern int XLOGbuffers;
extern int XLogArchiveTimeout;
extern int wal_retrieve_retry_interval;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 9336,9341 **** CreateRestartPoint(int flags)
--- 9336,9420 ----
}
/*
+ * Check if the record on the given lsn will be preserved at the next
+ * checkpoint.
+ *
+ * Returns true if it will be preserved. If distance is given, the distance
+ * from origin to the beginning of the first segment kept at the next
+ * checkpoint. It means margin when this function returns true and gap of lost
+ * records when false.
+ *
+ * This function should return the consistent result with KeepLogSeg.
+ */
+ bool
+ GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance)
+ {
+ XLogRecPtr currpos;
+ XLogRecPtr tailpos;
+ uint64 currSeg;
+ uint64 restByteInSeg;
+ uint64 restartSeg;
+ uint64 tailSeg;
+ uint64 keepSegs;
+
+ currpos = GetXLogWriteRecPtr();
+
+ LWLockAcquire(ControlFileLock, LW_SHARED);
+ tailpos = ControlFile->checkPointCopy.redo;
+ LWLockRelease(ControlFileLock);
+
+ /* Move the pointer to the beginning of the segment*/
+ XLByteToSeg(currpos, currSeg);
+ XLByteToSeg(restartLSN, restartSeg);
+ XLByteToSeg(tailpos, tailSeg);
+ restByteInSeg = 0;
+
+ Assert(wal_keep_segments >= 0);
+ Assert(max_slot_wal_keep_size_mb >= 0);
+
+ /*
+ * WAL are removed by the unit of segment.
+ */
+ keepSegs = wal_keep_segments + ConvertToXSegs(max_slot_wal_keep_size_mb);
+
+ /*
+ * If the latest checkpoint's redo point is older than the current head
+ * minus keep segments, the next checkpoint keeps the redo point's
+ * segment. Elsewise use current head minus number of segments to keep.
+ */
+ if (currSeg < tailSeg + keepSegs)
+ {
+ if (currSeg < keepSegs)
+ tailSeg = 0;
+ else
+ tailSeg = currSeg - keepSegs;
+
+ /* In this case, the margin will be the bytes to the next segment */
+ restByteInSeg = XLogSegSize - (currpos % XLogSegSize);
+ }
+
+ /* Required sements will be removed at the next checkpoint */
+ if (restartSeg < tailSeg)
+ {
+ /* Calculate how may bytes the slot have lost */
+ if (distance)
+ {
+ uint64 restbytes = (restartSeg + 1) * XLogSegSize - restartLSN;
+ *distance =
+ (tailSeg - restartSeg - 1) * XLogSegSize
+ + restbytes;
+ }
+ return false;
+ }
+
+ /* Margin at the next checkpoint before the slot lose sync */
+ if (distance)
+ *distance = (restartSeg - tailSeg) * XLogSegSize + restByteInSeg;
+
+ return true;
+ }
+
+ /*
* Retreat *logSegNo to the last segment that we need to retain because of
* either wal_keep_segments or replication slots.
*
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 793,799 **** CREATE VIEW pg_replication_slots AS
L.xmin,
L.catalog_xmin,
L.restart_lsn,
! L.confirmed_flush_lsn
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
--- 793,801 ----
L.xmin,
L.catalog_xmin,
L.restart_lsn,
! L.confirmed_flush_lsn,
! L.live,
! L.distance
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
*** a/src/backend/replication/slotfuncs.c
--- b/src/backend/replication/slotfuncs.c
***************
*** 182,188 **** pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
! #define PG_GET_REPLICATION_SLOTS_COLS 11
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
--- 182,188 ----
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
! #define PG_GET_REPLICATION_SLOTS_COLS 13
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
***************
*** 304,309 **** pg_get_replication_slots(PG_FUNCTION_ARGS)
--- 304,323 ----
else
nulls[i++] = true;
+ if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr)
+ {
+ uint64 distance;
+
+ values[i++] = BoolGetDatum(GetMarginToSlotSegmentLimit(restart_lsn,
+ &distance));
+ values[i++] = Int64GetDatum(distance);
+ }
+ else
+ {
+ values[i++] = BoolGetDatum(true);
+ nulls[i++] = true;
+ }
+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
LWLockRelease(ReplicationSlotControlLock);
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 267,272 **** extern void ShutdownXLOG(int code, Datum arg);
--- 267,273 ----
extern void InitXLOGAccess(void);
extern void CreateCheckPoint(int flags);
extern bool CreateRestartPoint(int flags);
+ extern bool GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance);
extern void XLogPutNextOid(Oid nextOid);
extern XLogRecPtr XLogRestorePoint(const char *rpName);
extern void UpdateFullPageWrites(void);
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 5347,5353 **** DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
! DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
DESCR("set up a logical replication slot");
--- 5347,5353 ----
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
! DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
DESCR("set up a logical replication slot");
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1451,1458 **** pg_replication_slots| SELECT l.slot_name,
l.xmin,
l.catalog_xmin,
l.restart_lsn,
! l.confirmed_flush_lsn
! FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
--- 1451,1460 ----
l.xmin,
l.catalog_xmin,
l.restart_lsn,
! l.confirmed_flush_lsn,
! l.live,
! l.distance
! FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, live, distance)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers