>From 0dcecaffb7ba94b900bbd5a483ca2cdfa996a952 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 3 May 2017 07:35:41 +0200
Subject: [PATCH] fix statistics reporting in logical replication workers

---
 src/backend/replication/logical/tablesync.c | 18 ++++++++++++++----
 src/backend/replication/logical/worker.c    |  2 +-
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0823000..cf331c0 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static List *table_states = NIL;
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
+	bool		xact_started = false;
 
 	Assert(!IsTransactionState());
 
@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		table_states = NIL;
 
 		StartTransactionCommand();
+		xact_started = true;
 
 		/* Fetch all non-ready tables. */
 		rstates	= GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 		MemoryContextSwitchTo(oldctx);
 
-		CommitTransactionCommand();
-
 		table_states_valid = true;
 	}
 
@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				StartTransactionCommand();
+				if (!xact_started)
+				{
+					StartTransactionCommand();
+					xact_started = true;
+				}
 				SetSubscriptionRelState(MyLogicalRepWorker->subid,
 										rstate->relid, rstate->state,
 										rstate->lsn);
-				CommitTransactionCommand();
 			}
 		}
 		else
@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			}
 		}
 	}
+
+	if (xact_started)
+	{
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+	}
 }
 
 /*
@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 										MyLogicalRepWorker->relstate,
 										MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
+				pgstat_report_stat(false);
 
 				/*
 				 * We want to do the table data sync in single
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 692fa3a..eb3d99b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -451,6 +451,7 @@ apply_handle_commit(StringInfo s)
 		replorigin_session_origin_timestamp = commit_data.committime;
 
 		CommitTransactionCommand();
+		pgstat_report_stat(false);
 
 		store_flush_position(commit_data.end_lsn);
 	}
@@ -460,7 +461,6 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
-	pgstat_report_stat(false);
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
 
-- 
2.7.4

