>From 42e5c5c0aebc34f249151e85873e7c3d2cb2c47d Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sun, 23 Apr 2017 01:00:25 +0200
Subject: [PATCH] Add support for time based lag tracking to logical
 replication

This patch adds new write api interface to LogicalDecodingContext called
update_progress and wrapper around it called OutputPluginUpdateProgress
for output plugins to use for progress reporting. This new interface is
optional, walsender uses it to do the time based lag tracking, while the
SQL interface does not implement it.
---
 src/backend/replication/logical/logical.c      | 34 ++++++++++++++++++++------
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/pgoutput/pgoutput.c    |  2 ++
 src/backend/replication/slotfuncs.c            |  3 ++-
 src/backend/replication/walsender.c            | 32 +++++++++++++++++-------
 src/include/replication/logical.h              | 15 +++++++++---
 src/include/replication/output_plugin.h        |  1 +
 7 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c5..7409e5c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92..27164de 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccf..4ddfbf7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68..56a9ca9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false, /* do not build snapshot */
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2a6c8bb..5349268 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -240,7 +240,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -915,7 +917,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1069,10 +1072,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1232,6 +1236,17 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	LagTrackerWrite(lsn, GetCurrentTimestamp());
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2691,9 +2706,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3227,9 +3242,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0b..090f9c8 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d..2435e2b 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
-- 
2.7.4

