diff --git a/contrib/pglogical/Makefile b/contrib/pglogical/Makefile
index e6238ff..149477f 100644
--- a/contrib/pglogical/Makefile
+++ b/contrib/pglogical/Makefile
@@ -15,7 +15,7 @@ OBJS = pglogical_apply.o pglogical_conflict.o pglogical_manager.o \
 PG_CPPFLAGS = -I$(libpq_srcdir) -I$(top_srcdir)/contrib/pglogical_output
 SHLIB_LINK = $(libpq)
 
-REGRESS = preseed infofuncs init_fail init preseed_check basic extended toasted replication_set add_table matview bidirectional primary_key foreign_key functions copy drop
+REGRESS = preseed infofuncs init_fail init preseed_check basic extended toasted replication_set add_table matview bidirectional primary_key foreign_key functions copy twophase drop
 
 # In-tree builds only
 subdir = contrib/pglogical
diff --git a/contrib/pglogical/expected/twophase.out b/contrib/pglogical/expected/twophase.out
new file mode 100644
index 0000000..ec93948
--- /dev/null
+++ b/contrib/pglogical/expected/twophase.out
@@ -0,0 +1,123 @@
+/* First test whether a table's replication set can be properly manipulated */
+SELECT * FROM pglogical_regress_variables()
+\gset
+\c :provider_dsn
+SELECT pglogical.replicate_ddl_command($$
+	CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
+$$);
+ replicate_ddl_command 
+-----------------------
+ t
+(1 row)
+
+-- SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}');
+SELECT * FROM pglogical.replication_set_add_table('default', 'test2pc_tbl');
+ replication_set_add_table 
+---------------------------
+ t
+(1 row)
+
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+ pg_xlog_wait_remote_apply 
+---------------------------
+ 
+(1 row)
+
+-- Check that prapeared state is visible on slave and data available after commit
+BEGIN;
+INSERT INTO test2pc_tbl VALUES (1, 10);
+PREPARE TRANSACTION 'tx1';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+ pg_xlog_wait_remote_apply 
+---------------------------
+ 
+(1 row)
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+         gid         | owner |      database      
+---------------------+-------+--------------------
+ tx1                 | stas  | contrib_regression
+ test_subscriber:tx1 | stas  | postgres
+(2 rows)
+
+SELECT * FROM test2pc_tbl;
+ id | value 
+----+-------
+(0 rows)
+
+\c :provider_dsn
+COMMIT PREPARED 'tx1';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+ pg_xlog_wait_remote_apply 
+---------------------------
+ 
+(1 row)
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+ gid | owner | database 
+-----+-------+----------
+(0 rows)
+
+SELECT * FROM test2pc_tbl;
+ id | value 
+----+-------
+  1 |    10
+(1 row)
+
+-- Check that prapeared state is visible on slave and data is ignored after abort
+\c :provider_dsn
+BEGIN;
+INSERT INTO test2pc_tbl VALUES (2, 20);
+PREPARE TRANSACTION 'tx2';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+ pg_xlog_wait_remote_apply 
+---------------------------
+ 
+(1 row)
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+         gid         | owner |      database      
+---------------------+-------+--------------------
+ tx2                 | stas  | contrib_regression
+ test_subscriber:tx2 | stas  | postgres
+(2 rows)
+
+SELECT * FROM test2pc_tbl;
+ id | value 
+----+-------
+  1 |    10
+(1 row)
+
+\c :provider_dsn
+ROLLBACK PREPARED 'tx2';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+ pg_xlog_wait_remote_apply 
+---------------------------
+ 
+(1 row)
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+ gid | owner | database 
+-----+-------+----------
+(0 rows)
+
+SELECT * FROM test2pc_tbl;
+ id | value 
+----+-------
+  1 |    10
+(1 row)
+
+-- Clean up
+\set VERBOSITY terse
+SELECT pglogical.replicate_ddl_command($$
+	DROP TABLE public.test2pc_tbl CASCADE;
+$$);
+ replicate_ddl_command 
+-----------------------
+ t
+(1 row)
+
diff --git a/contrib/pglogical/pglogical_apply.c b/contrib/pglogical/pglogical_apply.c
index bf5615c..fe7a872 100644
--- a/contrib/pglogical/pglogical_apply.c
+++ b/contrib/pglogical/pglogical_apply.c
@@ -134,7 +134,7 @@ handle_begin(StringInfo s)
 
 	in_remote_transaction = true;
 
-	pgstat_report_activity(STATE_RUNNING, false);
+	pgstat_report_activity(STATE_RUNNING, NULL);
 }
 
 /*
@@ -146,17 +146,87 @@ handle_commit(StringInfo s)
 	XLogRecPtr		commit_lsn;
 	XLogRecPtr		end_lsn;
 	TimestampTz		commit_time;
+	uint8 			flags;
+	const char	   *gid;
+	char 		   *local_gid;
+	PGLFlushPosition *flushpos;
+	bool 			flush = true;
+	PGLogicalLocalNode *mynode;
 
-	pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
+	pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time, &flags, &gid);
 
-	Assert(commit_lsn == replorigin_session_origin_lsn);
-	Assert(commit_time == replorigin_session_origin_timestamp);
-
-	if (IsTransactionState())
+	switch(PGLOGICAL_XACT_EVENT(flags))
 	{
-		PGLFlushPosition *flushpos;
+		case PGLOGICAL_COMMIT:
+		{
+			if (IsTransactionState())
+				CommitTransactionCommand();
+			else
+				flush = false;
+			break;
+		}
+		case PGLOGICAL_PREPARE:
+		{
+			/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
+			BeginTransactionBlock();
+			CommitTransactionCommand();
+			StartTransactionCommand();
 
-		CommitTransactionCommand();
+			mynode = get_local_node(false);
+			local_gid = palloc0(GIDSIZE);
+			strncpy(local_gid, mynode->node->name, GIDSIZE);
+			strncat(local_gid, ":", GIDSIZE);
+			strncat(local_gid, gid, GIDSIZE);
+
+			/* PREPARE itself */
+			PrepareTransactionBlock(local_gid);
+			CommitTransactionCommand();
+			break;
+		}
+		case PGLOGICAL_COMMIT_PREPARED:
+		{
+			StartTransactionCommand();
+
+			mynode = get_local_node(false);
+			local_gid = palloc0(GIDSIZE);
+			strncpy(local_gid, mynode->node->name, GIDSIZE);
+			strncat(local_gid, ":", GIDSIZE);
+			strncat(local_gid, gid, GIDSIZE);
+
+			FinishPreparedTransaction(local_gid, true);
+			CommitTransactionCommand();
+
+			/* There were no BEGIN stmt for COMMIT PREPARED */
+			replorigin_session_origin_timestamp = commit_time;
+			replorigin_session_origin_lsn = commit_lsn;
+
+			break;
+		}
+		case PGLOGICAL_ABORT_PREPARED:
+		{
+			StartTransactionCommand();
+
+			mynode = get_local_node(false);
+			local_gid = palloc(GIDSIZE);
+			strncpy(local_gid, mynode->node->name, GIDSIZE);
+			strncat(local_gid, ":", GIDSIZE);
+			strncat(local_gid, gid, GIDSIZE);
+
+			FinishPreparedTransaction(local_gid, false);
+			CommitTransactionCommand();
+
+			/* There were no BEGIN stmt for ROLLBACK PREPARED */
+			replorigin_session_origin_timestamp = commit_time;
+			replorigin_session_origin_lsn = commit_lsn;
+
+			break;
+		}
+		default:
+			Assert(false);
+	}
+
+	if (flush)
+	{
 		MemoryContextSwitchTo(TopMemoryContext);
 
 		/* Track commit lsn  */
@@ -168,6 +238,9 @@ handle_commit(StringInfo s)
 		MemoryContextSwitchTo(MessageContext);
 	}
 
+	Assert(commit_lsn == replorigin_session_origin_lsn);
+	Assert(commit_time == replorigin_session_origin_timestamp);
+
 	/*
 	 * If the row isn't from the immediate upstream; advance the slot of the
 	 * node it originally came from so we start replay of that node's
diff --git a/contrib/pglogical/pglogical_proto.c b/contrib/pglogical/pglogical_proto.c
index 558a955..096bde3 100644
--- a/contrib/pglogical/pglogical_proto.c
+++ b/contrib/pglogical/pglogical_proto.c
@@ -83,16 +83,19 @@ pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
  */
 void
 pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
-					   XLogRecPtr *end_lsn, TimestampTz *committime)
+					   XLogRecPtr *end_lsn, TimestampTz *committime,
+					   uint8 *flags, const char **gid)
 {
 	/* read flags */
-	uint8	flags = pq_getmsgbyte(in);
-	Assert(flags == 0);
+	*flags = pq_getmsgbyte(in);
 
 	/* read fields */
 	*commit_lsn = pq_getmsgint64(in);
 	*end_lsn = pq_getmsgint64(in);
 	*committime = pq_getmsgint64(in);
+
+	if (PGLOGICAL_XACT_EVENT(*flags) != PGLOGICAL_COMMIT)
+		*gid = pq_getmsgstring(in);
 }
 
 /*
diff --git a/contrib/pglogical/pglogical_proto.h b/contrib/pglogical/pglogical_proto.h
index 3229c322..f5cac00 100644
--- a/contrib/pglogical/pglogical_proto.h
+++ b/contrib/pglogical/pglogical_proto.h
@@ -24,10 +24,18 @@ typedef struct PGLogicalTupleData
 	bool	changed[MaxTupleAttributeNumber];
 } PGLogicalTupleData;
 
+#define PGLOGICAL_COMMIT			0x00
+#define PGLOGICAL_PREPARE			0x01
+#define PGLOGICAL_COMMIT_PREPARED	0x02
+#define PGLOGICAL_ABORT_PREPARED	0x03
+
+#define PGLOGICAL_XACT_EVENT(flags)	(flags & 0x03)
+
 extern void pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
 					  TimestampTz *committime, TransactionId *remote_xid);
 extern void pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
-					   XLogRecPtr *end_lsn, TimestampTz *committime);
+					   XLogRecPtr *end_lsn, TimestampTz *committime,
+					   uint8 *flags, const char **gid);
 extern char *pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 
 extern uint32 pglogical_read_rel(StringInfo in);
diff --git a/contrib/pglogical/pglogical_worker.c b/contrib/pglogical/pglogical_worker.c
index cc37866..2a65445 100644
--- a/contrib/pglogical/pglogical_worker.c
+++ b/contrib/pglogical/pglogical_worker.c
@@ -406,7 +406,7 @@ pglogical_worker_shmem_startup(void)
 
 	if (!found)
 	{
-		PGLogicalCtx->lock = LWLockAssign();
+		PGLogicalCtx->lock = &(GetNamedLWLockTranche("pglogical"))->lock;
 		PGLogicalCtx->supervisor = NULL;
 		PGLogicalCtx->total_workers = max_worker_processes;
 		memset(PGLogicalCtx->workers, 0,
@@ -430,7 +430,7 @@ pglogical_worker_shmem_init(void)
 	 * tries to allocate or free blocks from this array at once.  There won't
 	 * be enough contention to make anything fancier worth doing.
 	 */
-	RequestAddinLWLocks(1);
+	RequestNamedLWLockTranche("pglogical", 1);
 
 	/*
 	 * Whether this is a first startup or crash recovery, we'll be re-initing
diff --git a/contrib/pglogical/sql/twophase.sql b/contrib/pglogical/sql/twophase.sql
new file mode 100644
index 0000000..f7f821e
--- /dev/null
+++ b/contrib/pglogical/sql/twophase.sql
@@ -0,0 +1,54 @@
+/* First test whether a table's replication set can be properly manipulated */
+SELECT * FROM pglogical_regress_variables()
+\gset
+
+\c :provider_dsn
+SELECT pglogical.replicate_ddl_command($$
+	CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
+$$);
+SELECT * FROM pglogical.replication_set_add_table('default', 'test2pc_tbl');
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+
+-- Check that prapeared state is visible on slave and data available after commit
+BEGIN;
+INSERT INTO test2pc_tbl VALUES (1, 10);
+PREPARE TRANSACTION 'tx1';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+SELECT * FROM test2pc_tbl;
+
+\c :provider_dsn
+COMMIT PREPARED 'tx1';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+SELECT * FROM test2pc_tbl;
+
+
+-- Check that prapeared state is visible on slave and data is ignored after abort
+\c :provider_dsn
+BEGIN;
+INSERT INTO test2pc_tbl VALUES (2, 20);
+PREPARE TRANSACTION 'tx2';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+SELECT * FROM test2pc_tbl;
+
+\c :provider_dsn
+ROLLBACK PREPARED 'tx2';
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
+
+\c :subscriber_dsn
+SELECT gid, owner, database FROM pg_prepared_xacts;
+SELECT * FROM test2pc_tbl;
+
+-- Clean up
+\set VERBOSITY terse
+SELECT pglogical.replicate_ddl_command($$
+	DROP TABLE public.test2pc_tbl CASCADE;
+$$);
diff --git a/contrib/pglogical_output/pglogical_proto_json.c b/contrib/pglogical_output/pglogical_proto_json.c
index ae5a591..f6a5fbb 100644
--- a/contrib/pglogical_output/pglogical_proto_json.c
+++ b/contrib/pglogical_output/pglogical_proto_json.c
@@ -90,7 +90,31 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
 						XLogRecPtr commit_lsn)
 {
 	appendStringInfoChar(out, '{');
-	appendStringInfoString(out, "\"action\":\"C\"");
+
+	if (txn->xact_action == XLOG_XACT_PREPARE)
+	{
+		appendStringInfoString(out, "\"action\":\"P\"");
+		appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
+	}
+	else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
+	{
+		appendStringInfoString(out, "\"action\":\"CP\"");
+		appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
+	}
+	else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
+	{
+		appendStringInfoString(out, "\"action\":\"AP\"");
+		appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
+	}
+	else if (txn->xact_action == XLOG_XACT_COMMIT)
+	{
+		appendStringInfoString(out, "\"action\":\"C\"");
+	}
+	else
+	{
+		Assert(false);
+	}
+
 	if (!data->client_no_txinfo)
 	{
 		appendStringInfo(out, ", \"final_lsn\":\"%X/%X\"",
diff --git a/contrib/pglogical_output/pglogical_proto_native.c b/contrib/pglogical_output/pglogical_proto_native.c
index 87ec453..1cede2d 100644
--- a/contrib/pglogical_output/pglogical_proto_native.c
+++ b/contrib/pglogical_output/pglogical_proto_native.c
@@ -21,6 +21,7 @@
 #include "access/sysattr.h"
 #include "access/tuptoaster.h"
 #include "access/xact.h"
+#include "access/twophase.h"
 
 #include "catalog/catversion.h"
 #include "catalog/index.h"
@@ -196,7 +197,18 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
 {
 	uint8 flags = 0;
 
-	pq_sendbyte(out, 'C');		/* sending COMMIT */
+	pq_sendbyte(out, 'C'); /* sending COMMIT */
+
+	if (txn->xact_action == XLOG_XACT_COMMIT)
+		flags = PGLOGICAL_COMMIT;
+	else if (txn->xact_action == XLOG_XACT_PREPARE)
+		flags = PGLOGICAL_PREPARE;
+	else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
+		flags = PGLOGICAL_COMMIT_PREPARED;
+	else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
+		flags = PGLOGICAL_ABORT_PREPARED;
+	else
+		Assert(false);
 
 	/* send the flags field */
 	pq_sendbyte(out, flags);
@@ -205,6 +217,11 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
 	pq_sendint64(out, commit_lsn);
 	pq_sendint64(out, txn->end_lsn);
 	pq_sendint64(out, txn->commit_time);
+
+	if (txn->xact_action == XLOG_XACT_PREPARE ||
+			txn->xact_action == XLOG_XACT_COMMIT_PREPARED ||
+			txn->xact_action == XLOG_XACT_ABORT_PREPARED)
+		pq_sendstring(out, txn->gid);
 }
 
 /*
diff --git a/contrib/pglogical_output/pglogical_proto_native.h b/contrib/pglogical_output/pglogical_proto_native.h
index d3ba125..e99e341 100644
--- a/contrib/pglogical_output/pglogical_proto_native.h
+++ b/contrib/pglogical_output/pglogical_proto_native.h
@@ -23,6 +23,13 @@
  */
 #define PGLOGICAL_STARTUP_MSG_FORMAT_FLAT 1
 
+#define PGLOGICAL_COMMIT			0x00
+#define PGLOGICAL_PREPARE			0x01
+#define PGLOGICAL_COMMIT_PREPARED	0x02
+#define PGLOGICAL_ABORT_PREPARED	0x03
+
+#define PGLOGICAL_XACT_EVENT(flags)	(flags & 0x03)
+
 extern void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel,
 							struct PGLRelMetaCacheEntry *cache_entry);
 
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e8a334c..0a5b11d 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -97,10 +97,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 	if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+		uint8 gidlen = xl_twophase->gidlen;
 
 		parsed->twophase_xid = xl_twophase->xid;
+		data += MinSizeOfXactTwophase;
 
-		data += sizeof(xl_xact_twophase);
+		strcpy(parsed->twophase_gid, data);
+		data += gidlen;
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -138,6 +141,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -163,10 +176,13 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 	if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+		uint8 gidlen = xl_twophase->gidlen;
 
 		parsed->twophase_xid = xl_twophase->xid;
+		data += MinSizeOfXactTwophase;
 
-		data += sizeof(xl_xact_twophase);
+		strcpy(parsed->twophase_gid, data);
+		data += gidlen;
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8a22836..cb08c5f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -127,7 +127,6 @@ int			max_prepared_xacts = 0;
  * typedef struct GlobalTransactionData *GlobalTransaction appears in
  * twophase.h
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -185,12 +184,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -1232,6 +1233,39 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->nrels = hdr->ncommitrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strcpy(parsed->twophase_gid, hdr->gid);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->xnodes = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	/* Ignoring abortrels? */
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1385,11 +1419,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2026,7 +2061,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2049,7 +2085,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 nchildren, children, nrels, rels,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2111,7 +2147,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2129,7 +2166,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
 								nchildren, children,
 								nrels, rels,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0d5440..8db9533 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1215,7 +1215,7 @@ RecordTransactionCommit(void)
 							nchildren, children, nrels, rels,
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1567,7 +1567,7 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   InvalidTransactionId);
+					   InvalidTransactionId, NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -3445,7 +3445,7 @@ BeginTransactionBlock(void)
  * resource owner, etc while executing inside a Portal.
  */
 bool
-PrepareTransactionBlock(char *gid)
+PrepareTransactionBlock(const char *gid)
 {
 	TransactionState s;
 	bool		result;
@@ -5084,7 +5084,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					TransactionId twophase_xid)
+					TransactionId twophase_xid, const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5149,6 +5149,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
 	}
 
 	/* dump transaction origin information */
@@ -5199,7 +5200,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+	{
+		XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+		XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5220,13 +5224,14 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   TransactionId twophase_xid)
+				   TransactionId twophase_xid, const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
 
 	uint8		info;
 
@@ -5261,6 +5266,14 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5275,6 +5288,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5292,7 +5308,13 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+	{
+		XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+		XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
 
 	return XLogInsert(RM_XACT_ID, info);
 }
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 56be1ed..386954a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -69,7 +70,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			xl_xact_parsed_abort *parsed, TransactionId xid);
+			 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -195,6 +198,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	reorder->xact_action = info;
+
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
@@ -251,16 +256,14 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			break;
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+									XLogRecGetData(buf->record), &parsed);
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -519,6 +522,67 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid)) {
+		/*
+		 * We are processing COMMIT PREPARED and know that reorder buffer is
+		 * empty. So we can skip use shortcut for coomiting bare xact.
+		 */
+		strcpy(ctx->reorder->gid, parsed->twophase_gid);
+		ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	} else {
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+	strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed->nsubxacts, parsed->subxacts);
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
 						commit_time, origin_id, origin_lsn);
@@ -533,6 +597,22 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	commit_time = InvalidXLogRecPtr;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	/*
+	 * If that is ROLLBACK PREPARED than send that to callbacks.
+	 */
+	if (TransactionIdIsValid(parsed->twophase_xid)
+			&& (parsed->dbId == ctx->slot->data.database)) {
+
+		strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+		ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+		return;
+	}
 
 	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
 					  parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 78acced..4d1a7bd 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1278,6 +1278,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->commit_time = commit_time;
 	txn->origin_id = origin_id;
 	txn->origin_lsn = origin_lsn;
+	txn->xact_action = rb->xact_action;
+	memcpy(txn->gid, rb->gid, GIDSIZE);
 
 	/* serialize the last bunch of changes if we need start earlier anyway */
 	if (txn->nentries_mem != txn->nentries)
@@ -1610,6 +1612,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	PG_END_TRY();
 }
 
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	txn->xact_action = rb->xact_action;
+	strcpy(txn->gid, rb->gid);
+
+	rb->commit(rb, txn, commit_lsn);
+}
+
 /*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..1b8e7a0 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
 extern void RecoverPreparedTransactions(void);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index ebeb582..58242fc 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID.
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -219,8 +223,10 @@ typedef struct xl_xact_invals
 typedef struct xl_xact_twophase
 {
 	TransactionId xid;
+	uint8 gidlen;
+	char gid[GIDSIZE];
 } xl_xact_twophase;
-#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid)
 
 typedef struct xl_xact_origin
 {
@@ -279,13 +285,34 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			nrels;
+	RelFileNode *xnodes;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -296,6 +323,7 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 } xl_xact_parsed_abort;
 
 
@@ -327,7 +355,7 @@ extern void CommitTransactionCommand(void);
 extern void AbortCurrentTransaction(void);
 extern void BeginTransactionBlock(void);
 extern bool EndTransactionBlock(void);
-extern bool PrepareTransactionBlock(char *gid);
+extern bool PrepareTransactionBlock(const char *gid);
 extern void UserAbortTransactionBlock(void);
 extern void ReleaseSavepoint(List *options);
 extern void DefineSavepoint(char *name);
@@ -360,12 +388,12 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   TransactionId twophase_xid);
+				   TransactionId twophase_xid, const char *twophase_gid);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index d33ea27..548bbdb 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -132,6 +133,14 @@ typedef struct ReorderBufferTXN
 	 */
 	TransactionId xid;
 
+	/*
+	 * Commit callback is used for COMMIT/PREPARE/COMMMIT PREPARED,
+	 * as well as abort for ROLLBACK and ROLLBACK PREPARED. Here
+	 * stored actual xact action allowing decoding plugin to distinguish them.
+	 */
+	uint8		xact_action;
+	char		gid[GIDSIZE];
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -278,6 +287,10 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/* For twophase tx support we need to pass XACT action to ReorderBufferTXN */
+	uint8		xact_action;
+	char		gid[GIDSIZE];
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -350,6 +363,10 @@ void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, R
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
