diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c5345..7409e5ce3d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92f57..27164de093 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccffd5..4ddfbf7a98 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68819..56a9ca9651 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false, /* do not build snapshot */
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45d027803a..8bb142a2d4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1240,6 +1244,29 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	TimestampTz now = GetCurrentTimestamp();
+	static TimestampTz sendTime = 0;
+
+	/*
+	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+	 */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 	1000
+	if (!TimestampDifferenceExceeds(sendTime, now,
+									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+		return;
+
+	LagTrackerWrite(lsn, now);
+	sendTime = now;
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2730,9 +2757,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3328,9 +3355,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0bbae..090f9c8268 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d0c0..2435e2be2d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
