diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3aca6479b1f..1845f6250c6 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2904,6 +2904,35 @@ include_dir 'conf.d'
      across the cluster without problems if that is required.
     </para>
 
+    <sect2 id="runtime-config-replication-all">
+     <title>All Servers</title>
+     <para>
+      These parameters can be set on the primary or any standby.
+     </para>
+     <variablelist>
+      <varlistentry id="guc-causal-reads" xreflabel="causal_reads">
+       <term><varname>causal_reads</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary><varname>causal_reads</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Enables causal consistency between transactions run on different
+         servers.  A transaction that is run on a standby
+         with <varname>causal_reads</> set to <literal>on</> is guaranteed
+         either to see the effects of all completed transactions run on the
+         primary with the setting on, or to receive an error "standby is not
+         available for causal reads".  Note that both transactions involved in
+         a causal dependency (a write on the primary followed by a read on any
+         server which must see the write) must be run with the setting on.
+         See <xref linkend="causal-reads"> for more details.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>     
+    </sect2>
+
     <sect2 id="runtime-config-replication-sender">
      <title>Sending Server(s)</title>
 
@@ -3205,6 +3234,65 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><varname>causal_reads_max_replay_lag</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>causal_reads_max_replay_lag</> configuration
+        parameter</primary>
+       </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the maximum replay lag the primary will tolerate from a
+        standby before dropping it from the set of standbys available for
+        causal reads.
+       </para>
+       <para>
+        This must be set to a value which is at least 4 times the maximum
+        possible difference in system clocks between the primary and standby
+        servers, as described in <xref linkend="causal-reads">.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><varname>causal_reads_lease_time</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>causal_reads_lease_time</> configuration
+        parameter</primary>
+       </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the duration of 'leases' sent by the primary server to
+        standbys granting them the right to run causal reads queries for a
+        limited time.  This affects the rate at which replacement leases must
+        be sent and the wait time if contact is lost with a primary, as
+        described in <xref linkend="causal-reads">.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-causal-reads-standby-names" xreflabel="causal-reads-standby-names">
+      <term><varname>causal_reads_standby_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>causal_reads_standby_names</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a comma-separated list of standby names that can support
+        <firstterm>causal reads</>, as described in
+        <xref linkend="causal-reads">.  Follows the same convention
+        as <link linkend="guc-synchronous-standby-names"><literal>synchronous_standby_name</></>.
+        The default is <literal>*</>, matching all standbys.
+       </para>
+       <para>
+        This setting has no effect if <varname>causal_reads_timeout</> is not set.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 72eb073621f..ff2f14a5c38 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1115,7 +1115,7 @@ primary_slot_name = 'node_a_slot'
     cause each commit to wait until the current synchronous standbys report
     that they have replayed the transaction, making it visible to user
     queries.  In simple cases, this allows for load balancing with causal
-    consistency.
+    consistency.  See also <xref linkend="causal-reads">.
    </para>
 
    <para>
@@ -1313,6 +1313,119 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
    </sect3>
   </sect2>
 
+  <sect2 id="causal-reads">
+   <title>Causal reads</title>
+   <indexterm>
+    <primary>causal reads</primary>
+    <secondary>in standby</secondary>
+   </indexterm>
+
+   <para>
+    The causal reads feature allows read-only queries to run on hot standby
+    servers without exposing stale data to the client, providing a form of
+    causal consistency.  Transactions can run on any standby with the
+    following guarantee about the visibility of preceding transactions: If you
+    set <varname>causal_reads</> to <literal>on</> in any pair of consecutive
+    transactions tx1, tx2 where tx2 begins after tx1 successfully returns,
+    then tx2 will either see tx1 or fail with a new error "standby is not
+    available for causal reads", no matter which server it runs on.  Although
+    the guarantee is expressed in terms of two individual transactions, the
+    GUC can also be set at session, role or system level to make the guarantee
+    generally, allowing for load balancing of applications that were not
+    designed with load balancing in mind.
+   </para>
+
+   <para>
+    In order to enable the feature, <varname>causal_reads_max_replay_lag</>
+    must be set to a non-zero value on the primary server.  The
+    GUC <varname>causal_reads_standby_names</> can be used to limit the set of
+    standbys that can join the dynamic set of causal reads standbys by
+    providing a comma-separated list of application names.  By default, all
+    standbys are candidates, if the feature is enabled.
+   </para>
+
+   <para>
+    The current set of servers that the primary considers to be available for
+    causal reads can be seen in
+    the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></>
+    view.  Administrators, applications and load balancing middleware can use
+    this view to discover standbys that can currently handle causal reads
+    transactions without raising the error.  Since that information is only an
+    instantantaneous snapshot, clients should still be prepared for the error
+    to be raised at any time, and consider redirecting transactions to another
+    standby.
+   </para>
+
+   <para>
+    The advantages of the causal reads feature over simply
+    setting <varname>synchronous_commit</> to <literal>remote_apply</> are:
+    <orderedlist>
+      <listitem>
+       <para>
+        It provides certainty about exactly which standbys can see a
+        transaction.
+       </para>
+      </listitem>
+      <listitem>
+       <para>
+        It places a configurable limit on how much replay lag (and therefore
+        delay at commit time) the primary tolerates from standbys before it
+        drops them from the dynamic set of standbys it waits for.
+       </para>   
+      </listitem>
+      <listitem>
+       <para>
+        It upholds the causal reads guarantee during the transitions that
+        occur when new standbys are added or removed from the set of standbys,
+        including scenarios where contact has been lost between the primary
+        and standbys but the standby is still alive and running client
+        queries.
+       </para>
+      </listitem>
+    </orderedlist>
+   </para>
+
+   <para>
+    The protocol used to uphold the guarantee even in the case of network
+    failure depends on the system clocks of the primary and standby servers
+    being synchronized, with an allowance for a difference up to one quarter
+    of <varname>causal_reads_lease_time</>.  For example,
+    if <varname>causal_reads_lease_time</> is set to <literal>5s</>, then the
+    clocks must not be further than 1.25 second apart for the guarantee to be
+    upheld reliably during transitions.  The ubiquity of the Network Time
+    Protocol (NTP) on modern operating systems and availability of high
+    quality time servers makes it possible to choose a tolerance significantly
+    higher than the maximum expected clock difference.  An effort is
+    nevertheless made to detect and report misconfigured and faulty systems
+    with clock differences greater than the configured tolerance.
+   </para>
+
+   <note>
+    <para>
+     Current hardware clocks, NTP implementations and public time servers are
+     unlikely to allow the system clocks to differ more than tens or hundreds
+     of milliseconds, and systems synchronized with dedicated local time
+     servers may be considerably more accurate, but you should only consider
+     setting <varname>causal_reads_lease_time</> below the default of 5
+     seconds (allowing up to 1.25 second of clock difference) after
+     researching your time synchronization infrastructure thoroughly.
+    </para>  
+   </note>
+
+   <note>
+    <para>
+      While similar to synchronous replication in the sense that both involve
+      the primary server waiting for responses from standby servers, the
+      causal reads feature is not concerned with avoiding data loss.  A
+      primary configured for causal reads will drop all standbys that stop
+      responding or replay too slowly from the dynamic set that it waits for,
+      so you should consider configuring both synchronous replication and
+      causal reads if you need data loss avoidance guarantees and causal
+      consistency guarantees for load balancing.
+    </para>
+   </note>
+  </sect2>
+
   <sect2 id="continuous-archiving-in-standby">
    <title>Continuous archiving in standby</title>
 
@@ -1661,7 +1774,16 @@ if (!triggered)
     so there will be a measurable delay between primary and standby. Running the
     same query nearly simultaneously on both primary and standby might therefore
     return differing results. We say that data on the standby is
-    <firstterm>eventually consistent</firstterm> with the primary.  Once the
+    <firstterm>eventually consistent</firstterm> with the primary by default.
+    The data visible to a transaction running on a standby can be
+    made <firstterm>causally consistent</> with respect to a transaction that
+    has completed on the primary by setting <varname>causal_reads</>
+    to <literal>on</> in both transactions.  For more details,
+    see <xref linkend="causal-reads">.
+   </para>
+
+   <para>
+    Once the    
     commit record for a transaction is replayed on the standby, the changes
     made by that transaction will be visible to any new snapshots taken on
     the standby.  Snapshots may be taken at the start of each query or at the
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index be3dc672bcc..515064e8764 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1790,6 +1790,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        </itemizedlist>
      </entry>
     </row>
+    <row>
+     <entry><structfield>causal_reads_state</></entry>
+     <entry><type>text</></entry>
+     <entry>Causal reads state of this standby server.  This field will be
+     non-null only if <varname>cause_reads_timeout</> is set.  If a standby is
+     in <literal>available</> state, then it can currently serve causal reads
+     queries.  If it is not replaying fast enough or not responding to
+     keepalive messages, it will be in <literal>unavailable</> state, and if
+     it is currently transitioning to availability it will be
+     in <literal>joining</> state for a short time.</entry>
+    </row>
    </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index ba03d9687e5..1440b399bda 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2234,11 +2234,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	END_CRIT_SECTION();
 
 	/*
-	 * Wait for synchronous replication, if required.
+	 * Wait for causal reads and synchronous replication, if required.
 	 *
 	 * Note that at this stage we have marked clog, but still show as running
 	 * in the procarray and continue to hold locks.
 	 */
+	CausalReadsWaitForLSN(recptr);
 	SyncRepWaitForLSN(recptr, true);
 }
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69fe4b4..5aae6908647 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1342,7 +1342,10 @@ RecordTransactionCommit(void)
 	 * in the procarray and continue to hold locks.
 	 */
 	if (wrote_xlog && markXidCommitted)
+	{
+		CausalReadsWaitForLSN(XactLastRecEnd);
 		SyncRepWaitForLSN(XactLastRecEnd, true);
+	}
 
 	/* remember end of last commit record */
 	XactLastCommitEnd = XactLastRecEnd;
@@ -5149,7 +5152,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	 * Check if the caller would like to ask standbys for immediate feedback
 	 * once this commit is applied.
 	 */
-	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || causal_reads)
 		xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
 
 	/*
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 0fdad0c1197..f037f0fe349 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -732,7 +732,8 @@ CREATE VIEW pg_stat_replication AS
             W.flush_lag,
             W.replay_lag,
             W.sync_priority,
-            W.sync_state
+            W.sync_state,
+            W.causal_reads_state
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 65b7b328f1f..c9eb152892a 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3576,6 +3576,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_BTREE_PAGE:
 			event_name = "BtreePage";
 			break;
+		case WAIT_EVENT_CAUSAL_READS_APPLY:
+			event_name = "CausalReadsApply";
+			break;
 		case WAIT_EVENT_EXECUTE_GATHER:
 			event_name = "ExecuteGather";
 			break;
@@ -3634,6 +3637,9 @@ pgstat_get_wait_timeout(WaitEventTimeout w)
 		case WAIT_EVENT_BASE_BACKUP_THROTTLE:
 			event_name = "BaseBackupThrottle";
 			break;
+		case WAIT_EVENT_CAUSAL_READS_REVOKE:
+			event_name = "CausalReadsRevoke";
+			break;
 		case WAIT_EVENT_PG_SLEEP:
 			event_name = "PgSleep";
 			break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 898c497d12c..3eb79a0fd2b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1295,6 +1295,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	pq_sendint64(reply_message, writepos);	/* apply */
 	pq_sendint64(reply_message, now);	/* sendTime */
 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
+	pq_sendint64(reply_message, -1);		/* replyTo */
 
 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
 		 force,
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 5fd47689dd2..25e56397eb0 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -85,6 +85,13 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/ps_status.h"
+#include "utils/varlena.h"
+
+/* GUC variables */
+int causal_reads_max_replay_lag;
+int causal_reads_lease_time;
+bool causal_reads;
+char *causal_reads_standby_names;
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
@@ -99,7 +106,9 @@ static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
-static int	SyncRepWakeQueue(bool all, int mode);
+static int	SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn);
+
+static bool SyncRepCheckForEarlyExit(void);
 
 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 					 XLogRecPtr *flushPtr,
@@ -129,6 +138,227 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
  */
 
 /*
+ * Check if we can stop waiting for causal consistency.  We can stop waiting
+ * when the following conditions are met:
+ *
+ * 1.  All walsenders currently in 'joining' or 'available' state have
+ * applied the target LSN.
+ *
+ * 2.  All revoked leases have been acknowledged by the relevant standby or
+ * expired, so we know that the standby has started rejecting causal reads
+ * transactions.
+ *
+ * The output parameter 'waitingFor' is set to the number of nodes we are
+ * currently waiting for.  The output parameters 'stallTimeMillis' is set to
+ * the number of milliseconds we need to wait for because a lease has been
+ * revoked.
+ *
+ * Returns true if commit can return control, because every standby has either
+ * applied the LSN or started rejecting causal_reads transactions.
+ */
+static bool
+CausalReadsCommitCanReturn(XLogRecPtr XactCommitLSN,
+						   int *waitingFor,
+						   long *stallTimeMillis)
+{
+	TimestampTz now = GetCurrentTimestamp();
+	TimestampTz stallTime = 0;
+	int i;
+
+	/* Count how many joining/available nodes we are waiting for. */
+	*waitingFor = 0;
+
+	for (i = 0; i < max_wal_senders; ++i)
+	{
+		WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		if (walsnd->pid != 0)
+		{
+			/*
+			 * We need to hold the spinlock to read LSNs, because we can't be
+			 * sure they can be read atomically.
+			 */
+			SpinLockAcquire(&walsnd->mutex);
+			if (walsnd->pid != 0)
+			{
+				switch (walsnd->causalReadsState)
+				{
+				case WALSNDCRSTATE_UNAVAILABLE:
+					/* Nothing to wait for. */
+					break;
+				case WALSNDCRSTATE_JOINING:
+				case WALSNDCRSTATE_AVAILABLE:
+					/*
+					 * We have to wait until this standby tells us that is has
+					 * replayed the commit record.
+					 */
+					if (walsnd->apply < XactCommitLSN)
+						++*waitingFor;
+					break;
+				case WALSNDCRSTATE_REVOKING:
+					/*
+					 * We have to hold up commits until this standby
+					 * acknowledges that its lease was revoked, or we know the
+					 * most recently sent lease has expired anyway, whichever
+					 * comes first.  One way or the other, we don't release
+					 * until this standby has started raising an error for
+					 * causal reads transactions.
+					 */
+					if (walsnd->revokingUntil > now)
+					{
+						++*waitingFor;
+						stallTime = Max(stallTime, walsnd->revokingUntil);
+					}
+					break;
+				}
+			}
+			SpinLockRelease(&walsnd->mutex);
+		}
+	}
+
+	/*
+	 * If a walsender has exitted uncleanly, then it writes itsrevoking wait
+	 * time into a shared space before it gives up its WalSnd slot.  So we
+	 * have to wait for that too.
+	 */
+	LWLockAcquire(SyncRepLock, LW_SHARED);
+	if (WalSndCtl->revokingUntil > now)
+	{
+		long seconds;
+		int usecs;
+
+		/* Compute how long we have to wait, rounded up to nearest ms. */
+		TimestampDifference(now, WalSndCtl->revokingUntil,
+							&seconds, &usecs);
+		*stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000;
+	}
+	else
+		*stallTimeMillis = 0;
+	LWLockRelease(SyncRepLock);
+
+	/* We are done if we are not waiting for any nodes or stalls. */
+	return *waitingFor == 0 && *stallTimeMillis == 0;
+}
+
+/*
+ * Wait for causal consistency in causal_reads mode, if requested by user.
+ */
+void
+CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN)
+{
+	long stallTimeMillis;
+	int waitingFor;
+	char *ps_display_buffer = NULL;
+
+	/* Leave if we aren't in causal_reads mode. */
+	if (!causal_reads)
+		return;
+
+	for (;;)
+	{
+		/* Reset latch before checking state. */
+		ResetLatch(MyLatch);
+
+		/*
+		 * Join the queue to be woken up if any causal reads joining/available
+		 * standby applies XactCommitLSN or the set of causal reads standbys
+		 * changes (if we aren't already in the queue).  We don't actually know
+		 * if we need to wait for any peers to reach the target LSN yet, but
+		 * we have to register just in case before checking the walsenders'
+		 * state to avoid a race condition that could occur if we did it after
+		 * calling CausalReadsCommitCanReturn.  (SyncRepWaitForLSN doesn't
+		 * have to do this because it can check the highest-seen LSN in
+		 * walsndctl->lsn[mode] which is protected by SyncRepLock, the same
+		 * lock as the queues.  We can't do that here, because there is no
+		 * single highest-seen LSN that is useful.  We must check
+		 * walsnd->apply for all relevant walsenders.  Therefore we must
+		 * register for notifications first, so that we can be notified via
+		 * our latch of any standby applying the LSN we're interested in after
+		 * we check but before we start waiting, or we could wait forever for
+		 * something that has already happened.)
+		 */
+		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+		if (MyProc->syncRepState != SYNC_REP_WAITING)
+		{
+			MyProc->waitLSN = XactCommitLSN;
+			MyProc->syncRepState = SYNC_REP_WAITING;
+			SyncRepQueueInsert(SYNC_REP_WAIT_CAUSAL_READS);
+			Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_CAUSAL_READS));
+		}
+		LWLockRelease(SyncRepLock);
+
+		/* Check if we're done. */
+		if (CausalReadsCommitCanReturn(XactCommitLSN, &waitingFor, &stallTimeMillis))
+		{
+			SyncRepCancelWait();
+			break;
+		}
+
+		Assert(waitingFor > 0 || stallTimeMillis > 0);
+
+		/* If we aren't actually waiting for any standbys, leave the queue. */
+		if (waitingFor == 0)
+			SyncRepCancelWait();
+
+		/* Update the ps title. */
+		if (update_process_title)
+		{
+			char buffer[80];
+
+			/* Remember the old value if this is our first update. */
+			if (ps_display_buffer == NULL)
+			{
+				int len;
+				const char *ps_display = get_ps_display(&len);
+
+				ps_display_buffer = palloc(len + 1);
+				memcpy(ps_display_buffer, ps_display, len);
+				ps_display_buffer[len] = '\0';
+			}
+
+			snprintf(buffer, sizeof(buffer),
+					 "waiting for %d peer(s) to apply %X/%X%s",
+					 waitingFor,
+					 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN,
+					 stallTimeMillis > 0 ? " (revoking)" : "");
+			set_ps_display(buffer, false);
+		}
+
+		/* Check if we need to exit early due to postmaster death etc. */
+		if (SyncRepCheckForEarlyExit()) /* Calls SyncRepCancelWait() if true. */
+			break;
+
+		/*
+		 * If are still waiting for peers, then we wait for any joining or
+		 * available peer to reach the LSN (or possibly stop being in one of
+		 * those states or go away).
+		 *
+		 * If not, there must be a non-zero stall time, so we wait for that to
+		 * elapse.
+		 */
+		if (waitingFor > 0)
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
+					  WAIT_EVENT_CAUSAL_READS_APPLY);
+		else
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
+					  stallTimeMillis,
+					  WAIT_EVENT_CAUSAL_READS_REVOKE);
+	}
+
+	/* There is no way out of the loop that could leave us in the queue. */
+	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+	MyProc->waitLSN = 0;
+
+	/* Restore the ps display. */
+	if (ps_display_buffer != NULL)
+	{
+		set_ps_display(ps_display_buffer, false);
+		pfree(ps_display_buffer);
+	}
+}
+
+/*
  * Wait for synchronous replication, if requested by user.
  *
  * Initially backends start in state SYNC_REP_NOT_WAITING and then
@@ -229,57 +459,9 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 		if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
 			break;
 
-		/*
-		 * If a wait for synchronous replication is pending, we can neither
-		 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
-		 * lead the client to believe that the transaction aborted, which is
-		 * not true: it's already committed locally. The former is no good
-		 * either: the client has requested synchronous replication, and is
-		 * entitled to assume that an acknowledged commit is also replicated,
-		 * which might not be true. So in this case we issue a WARNING (which
-		 * some clients may be able to interpret) and shut off further output.
-		 * We do NOT reset ProcDiePending, so that the process will die after
-		 * the commit is cleaned up.
-		 */
-		if (ProcDiePending)
-		{
-			ereport(WARNING,
-					(errcode(ERRCODE_ADMIN_SHUTDOWN),
-					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
-			whereToSendOutput = DestNone;
-			SyncRepCancelWait();
-			break;
-		}
-
-		/*
-		 * It's unclear what to do if a query cancel interrupt arrives.  We
-		 * can't actually abort at this point, but ignoring the interrupt
-		 * altogether is not helpful, so we just terminate the wait with a
-		 * suitable warning.
-		 */
-		if (QueryCancelPending)
-		{
-			QueryCancelPending = false;
-			ereport(WARNING,
-					(errmsg("canceling wait for synchronous replication due to user request"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
-			SyncRepCancelWait();
-			break;
-		}
-
-		/*
-		 * If the postmaster dies, we'll probably never get an
-		 * acknowledgement, because all the wal sender processes will exit. So
-		 * just bail out.
-		 */
-		if (!PostmasterIsAlive())
-		{
-			ProcDiePending = true;
-			whereToSendOutput = DestNone;
-			SyncRepCancelWait();
+		/* Check if we need to break early due to cancel/shutdown/death. */
+		if (SyncRepCheckForEarlyExit())
 			break;
-		}
 
 		/*
 		 * Wait on latch.  Any condition that should wake us up will set the
@@ -399,6 +581,53 @@ SyncRepInitConfig(void)
 }
 
 /*
+ * Check if the current WALSender process's application_name matches a name in
+ * causal_reads_standby_names (including '*' for wildcard).
+ */
+bool
+CausalReadsPotentialStandby(void)
+{
+	char *rawstring;
+	List	   *elemlist;
+	ListCell   *l;
+	bool		found = false;
+
+	/* If the feature is disable, then no. */
+	if (causal_reads_max_replay_lag == 0)
+		return false;
+
+	/* Need a modifiable copy of string */
+	rawstring = pstrdup(causal_reads_standby_names);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawstring, ',', &elemlist))
+	{
+		/* syntax error in list */
+		pfree(rawstring);
+		list_free(elemlist);
+		/* GUC machinery will have already complained - no need to do again */
+		return 0;
+	}
+
+	foreach(l, elemlist)
+	{
+		char	   *standby_name = (char *) lfirst(l);
+
+		if (pg_strcasecmp(standby_name, application_name) == 0 ||
+			pg_strcasecmp(standby_name, "*") == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+
+	pfree(rawstring);
+	list_free(elemlist);
+
+	return found;
+}
+
+/*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-sync-standby-releases-waiter.
  *
@@ -406,7 +635,7 @@ SyncRepInitConfig(void)
  * perhaps also which information we store as well.
  */
 void
-SyncRepReleaseWaiters(void)
+SyncRepReleaseWaiters(bool walsender_cr_blocker)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	XLogRecPtr	writePtr;
@@ -420,13 +649,15 @@ SyncRepReleaseWaiters(void)
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
-	 * potential sync standbys then we have nothing to do. If we are still
-	 * starting up, still running base backup or the current flush position is
-	 * still invalid, then leave quickly also.
+	 * potential sync standbys and not in a state that causal_reads waits for,
+	 * then we have nothing to do. If we are still starting up, still running
+	 * base backup or the current flush position is still invalid, then leave
+	 * quickly also.
 	 */
-	if (MyWalSnd->sync_standby_priority == 0 ||
-		MyWalSnd->state < WALSNDSTATE_STREAMING ||
-		XLogRecPtrIsInvalid(MyWalSnd->flush))
+	if (!walsender_cr_blocker &&
+		(MyWalSnd->sync_standby_priority == 0 ||
+		 MyWalSnd->state < WALSNDSTATE_STREAMING ||
+		 XLogRecPtrIsInvalid(MyWalSnd->flush)))
 	{
 		announce_next_takeover = true;
 		return;
@@ -464,9 +695,10 @@ SyncRepReleaseWaiters(void)
 
 	/*
 	 * If the number of sync standbys is less than requested or we aren't
-	 * managing a sync standby then just leave.
+	 * managing a sync standby or a standby in causal reads blocking state,
+	 * then just leave.
 	 */
-	if (!got_recptr || !am_sync)
+	if ((!got_recptr || !am_sync) && !walsender_cr_blocker)
 	{
 		LWLockRelease(SyncRepLock);
 		announce_next_takeover = !am_sync;
@@ -475,24 +707,35 @@ SyncRepReleaseWaiters(void)
 
 	/*
 	 * Set the lsn first so that when we wake backends they will release up to
-	 * this location.
+	 * this location, for backends waiting for synchronous commit.
 	 */
-	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
-	{
-		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
-		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
-	}
-	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
-	{
-		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
-		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
-	}
-	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
+	if (got_recptr && am_sync)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
-		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+		if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
+			numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, writePtr);
+		}
+		if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
+			numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, flushPtr);
+		}
+		if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
+			numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, applyPtr);
+		}
 	}
 
+	/*
+	 * Wake backends that are waiting for causal_reads, if this walsender
+	 * manages a standby that is in causal reads 'available' or 'joining'
+	 * state.
+	 */
+	if (walsender_cr_blocker)
+		SyncRepWakeQueue(false, SYNC_REP_WAIT_CAUSAL_READS, MyWalSnd->apply);
+
 	LWLockRelease(SyncRepLock);
 
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
@@ -970,9 +1213,8 @@ SyncRepGetStandbyPriority(void)
  * Must hold SyncRepLock.
  */
 static int
-SyncRepWakeQueue(bool all, int mode)
+SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn)
 {
-	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	PGPROC	   *proc = NULL;
 	PGPROC	   *thisproc = NULL;
 	int			numprocs = 0;
@@ -989,7 +1231,7 @@ SyncRepWakeQueue(bool all, int mode)
 		/*
 		 * Assume the queue is ordered by LSN
 		 */
-		if (!all && walsndctl->lsn[mode] < proc->waitLSN)
+		if (!all && lsn < proc->waitLSN)
 			return numprocs;
 
 		/*
@@ -1049,7 +1291,7 @@ SyncRepUpdateSyncStandbysDefined(void)
 			int			i;
 
 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-				SyncRepWakeQueue(true, i);
+				SyncRepWakeQueue(true, i, InvalidXLogRecPtr);
 		}
 
 		/*
@@ -1100,6 +1342,64 @@ SyncRepQueueIsOrderedByLSN(int mode)
 }
 #endif
 
+static bool
+SyncRepCheckForEarlyExit(void)
+{
+	/*
+	 * If a wait for synchronous replication is pending, we can neither
+	 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
+	 * lead the client to believe that the transaction aborted, which is
+	 * not true: it's already committed locally. The former is no good
+	 * either: the client has requested synchronous replication, and is
+	 * entitled to assume that an acknowledged commit is also replicated,
+	 * which might not be true. So in this case we issue a WARNING (which
+	 * some clients may be able to interpret) and shut off further output.
+	 * We do NOT reset ProcDiePending, so that the process will die after
+	 * the commit is cleaned up.
+	 */
+	if (ProcDiePending)
+	{
+		ereport(WARNING,
+				(errcode(ERRCODE_ADMIN_SHUTDOWN),
+				 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
+				 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+		whereToSendOutput = DestNone;
+		SyncRepCancelWait();
+		return true;
+	}
+
+	/*
+	 * It's unclear what to do if a query cancel interrupt arrives.  We
+	 * can't actually abort at this point, but ignoring the interrupt
+	 * altogether is not helpful, so we just terminate the wait with a
+	 * suitable warning.
+	 */
+	if (QueryCancelPending)
+	{
+		QueryCancelPending = false;
+		ereport(WARNING,
+				(errmsg("canceling wait for synchronous replication due to user request"),
+				 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+		SyncRepCancelWait();
+		return true;
+	}
+
+	/*
+	 * If the postmaster dies, we'll probably never get an
+	 * acknowledgement, because all the wal sender processes will exit. So
+	 * just bail out.
+	 */
+	if (!PostmasterIsAlive())
+	{
+		ProcDiePending = true;
+		whereToSendOutput = DestNone;
+		SyncRepCancelWait();
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * ===========================================================
  * Synchronous Replication functions executed by any process
@@ -1107,7 +1407,7 @@ SyncRepQueueIsOrderedByLSN(int mode)
  */
 
 bool
-check_synchronous_standby_names(char **newval, void **extra, GucSource source)
+check_standby_names(char **newval, void **extra, GucSource source)
 {
 	if (*newval != NULL && (*newval)[0] != '\0')
 	{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8a249e22b9f..9f1113470c7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/ipc.h"
@@ -139,9 +140,10 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, int64 replyTo);
 static void XLogWalRcvSendHSFeedback(bool immed);
-static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime,
+								  TimestampTz *causalReadsLease);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -466,7 +468,7 @@ WalReceiverMain(void)
 					}
 
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(false, false, -1);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -511,7 +513,7 @@ WalReceiverMain(void)
 						 */
 						walrcv->force_reply = false;
 						pg_memory_barrier();
-						XLogWalRcvSendReply(true, false);
+						XLogWalRcvSendReply(true, false, -1);
 					}
 				}
 				if (rc & WL_POSTMASTER_DEATH)
@@ -569,7 +571,7 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					XLogWalRcvSendReply(requestReply, requestReply, -1);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -874,6 +876,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 	XLogRecPtr	walEnd;
 	TimestampTz sendTime;
 	bool		replyRequested;
+	TimestampTz causalReadsLease;
+	int64		messageNumber;
 
 	resetStringInfo(&incoming_message);
 
@@ -893,7 +897,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				dataStart = pq_getmsgint64(&incoming_message);
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = pq_getmsgint64(&incoming_message);
-				ProcessWalSndrMessage(walEnd, sendTime);
+				ProcessWalSndrMessage(walEnd, sendTime, NULL);
 
 				buf += hdrlen;
 				len -= hdrlen;
@@ -903,7 +907,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 		case 'k':				/* Keepalive */
 			{
 				/* copy message to StringInfo */
-				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64) +
+					sizeof(char) + sizeof(int64);
 				if (len != hdrlen)
 					ereport(ERROR,
 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -911,15 +916,17 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
 
 				/* read the fields */
+				messageNumber = pq_getmsgint64(&incoming_message);
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = pq_getmsgint64(&incoming_message);
 				replyRequested = pq_getmsgbyte(&incoming_message);
+				causalReadsLease = pq_getmsgint64(&incoming_message);
 
-				ProcessWalSndrMessage(walEnd, sendTime);
+				ProcessWalSndrMessage(walEnd, sendTime, &causalReadsLease);
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, messageNumber);
 				break;
 			}
 		default:
@@ -1082,7 +1089,7 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(false, false, -1);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1100,9 +1107,12 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * If this is a reply to a specific message from the upstream server, then
+ * 'replyTo' should include the message number, otherwise -1.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, int64 replyTo)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
@@ -1149,6 +1159,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	pq_sendint64(&reply_message, applyPtr);
 	pq_sendint64(&reply_message, GetCurrentTimestamp());
 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
+	pq_sendint64(&reply_message, replyTo);
 
 	/* Send it */
 	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
@@ -1281,10 +1292,13 @@ XLogWalRcvSendHSFeedback(bool immed)
  * Update shared memory status upon receiving a message from primary.
  *
  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
- * message, reported by primary.
+ * message, reported by primary.  'causalReadsLease' is a pointer to
+ * the time the primary promises that this standby can safely claim to be
+ * causally consistent, to 0 if it cannot, or a NULL pointer for no change.
  */
 static void
-ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime,
+					  TimestampTz *causalReadsLease)
 {
 	WalRcvData *walrcv = WalRcv;
 
@@ -1297,6 +1311,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 	walrcv->latestWalEnd = walEnd;
 	walrcv->lastMsgSendTime = sendTime;
 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+	if (causalReadsLease != NULL)
+		walrcv->causalReadsLease = *causalReadsLease;
 	SpinLockRelease(&walrcv->mutex);
 
 	if (log_min_messages <= DEBUG2)
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 8ed7254b5c6..7d557234c47 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -27,6 +27,7 @@
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/guc.h"
 #include "utils/timestamp.h"
 
 WalRcvData *WalRcv = NULL;
@@ -373,3 +374,21 @@ GetReplicationTransferLatency(void)
 
 	return ms;
 }
+
+/*
+ * Used by snapmgr to check if this standby has a valid lease, granting it the
+ * right to consider itself available for causal reads.
+ */
+bool
+WalRcvCausalReadsAvailable(void)
+{
+	WalRcvData *walrcv = WalRcv;
+	TimestampTz now = GetCurrentTimestamp();
+	bool result;
+
+	SpinLockAcquire(&walrcv->mutex);
+	result = walrcv->causalReadsLease != 0 && now <= walrcv->causalReadsLease;
+	SpinLockRelease(&walrcv->mutex);
+
+	return result;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f845180873e..a7d6ec5233d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -167,9 +167,23 @@ static StringInfoData tmpbuf;
  */
 static TimestampTz last_reply_timestamp = 0;
 
+static TimestampTz last_keepalive_timestamp = 0;
+
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool waiting_for_ping_response = false;
 
+/* At what point in the WAL can we progress from JOINING state? */
+static XLogRecPtr causal_reads_joining_until = 0;
+
+/* The last causal reads lease sent to the standby. */
+static TimestampTz causal_reads_last_lease = 0;
+
+/* The last causal reads lease revocation message's number. */
+static int64 causal_reads_revoke_msgno = 0;
+
+/* Is this WALSender listed in causal_reads_standby_names? */
+static bool am_potential_causal_reads_standby = false;
+
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
  * after we have sent CopyDone. We should not send any more CopyData messages
@@ -239,7 +253,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
-static void WalSndKeepalive(bool requestReply);
+static int64 WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(TimestampTz now);
 static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
@@ -281,6 +295,60 @@ InitWalSender(void)
 }
 
 /*
+ * If we are exiting unexpectedly, we may need to communicate with concurrent
+ * causal_reads commits to maintain the causal consistency guarantee.
+ */
+static void
+PrepareUncleanExit(void)
+{
+	if (MyWalSnd->causalReadsState == WALSNDCRSTATE_AVAILABLE)
+	{
+		/*
+		 * We've lost contact with the standby, but it may still be alive.  We
+		 * can't let any committing causal_reads transactions return control
+		 * until we've stalled for long enough for a zombie standby to start
+		 * raising errors because its lease has expired.  Because our WalSnd
+		 * slot is going away, we need to use the shared
+		 * WalSndCtl->revokingUntil variable.
+		 */
+		elog(LOG,
+			 "contact lost with standby \"%s\", revoking causal reads lease by stalling",
+			 application_name);
+
+		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+		WalSndCtl->revokingUntil = Max(WalSndCtl->revokingUntil,
+									   causal_reads_last_lease);
+		LWLockRelease(SyncRepLock);
+
+		SpinLockAcquire(&MyWalSnd->mutex);
+		MyWalSnd->causalReadsState = WALSNDCRSTATE_UNAVAILABLE;
+		SpinLockRelease(&MyWalSnd->mutex);
+	}
+}
+
+/*
+ * We are shutting down because we received a goodbye message from the
+ * walreceiver.
+ */
+static void
+PrepareCleanExit(void)
+{
+	if (MyWalSnd->causalReadsState == WALSNDCRSTATE_AVAILABLE)
+	{
+		/*
+		 * The standby is shutting down, so it won't be running any more
+		 * transactions.  It is therefore safe to stop waiting for it without
+		 * any kind of lease revocation protocol.
+		 */
+		elog(LOG, "standby \"%s\" is leaving causal reads set", application_name);
+
+		SpinLockAcquire(&MyWalSnd->mutex);
+		MyWalSnd->causalReadsState = WALSNDCRSTATE_UNAVAILABLE;
+		SpinLockRelease(&MyWalSnd->mutex);
+	}
+}
+
+/*
  * Clean up after an error.
  *
  * WAL sender processes don't use transactions like regular backends do.
@@ -308,7 +376,10 @@ WalSndErrorCleanup(void)
 	replication_active = false;
 
 	if (got_STOPPING || got_SIGUSR2)
+	{
+		PrepareUncleanExit();
 		proc_exit(0);
+	}
 
 	/* Revert back to startup state */
 	WalSndSetState(WALSNDSTATE_STARTUP);
@@ -320,6 +391,8 @@ WalSndErrorCleanup(void)
 static void
 WalSndShutdown(void)
 {
+	PrepareUncleanExit();
+
 	/*
 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
 	 * more messages to the standby.
@@ -1578,6 +1651,7 @@ ProcessRepliesIfAny(void)
 		if (r < 0)
 		{
 			/* unexpected error or EOF */
+			PrepareUncleanExit();
 			ereport(COMMERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
 					 errmsg("unexpected EOF on standby connection")));
@@ -1594,6 +1668,7 @@ ProcessRepliesIfAny(void)
 		resetStringInfo(&reply_message);
 		if (pq_getmessage(&reply_message, 0))
 		{
+			PrepareUncleanExit();
 			ereport(COMMERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
 					 errmsg("unexpected EOF on standby connection")));
@@ -1643,6 +1718,7 @@ ProcessRepliesIfAny(void)
 				 * 'X' means that the standby is closing down the socket.
 				 */
 			case 'X':
+				PrepareCleanExit();
 				proc_exit(0);
 
 			default:
@@ -1740,9 +1816,11 @@ ProcessStandbyReplyMessage(void)
 				flushLag,
 				applyLag;
 	bool		clearLagTimes;
+	int64		replyTo;
 	TimestampTz now;
 
 	static bool fullyAppliedLastTime = false;
+	static TimestampTz fullyAppliedSince = 0;
 
 	/* the caller already consumed the msgtype byte */
 	writePtr = pq_getmsgint64(&reply_message);
@@ -1750,6 +1828,7 @@ ProcessStandbyReplyMessage(void)
 	applyPtr = pq_getmsgint64(&reply_message);
 	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
 	replyRequested = pq_getmsgbyte(&reply_message);
+	replyTo = pq_getmsgint64(&reply_message);
 
 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
@@ -1764,17 +1843,17 @@ ProcessStandbyReplyMessage(void)
 	applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
 
 	/*
-	 * If the standby reports that it has fully replayed the WAL in two
-	 * consecutive reply messages, then the second such message must result
-	 * from wal_receiver_status_interval expiring on the standby.  This is a
-	 * convenient time to forget the lag times measured when it last
-	 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
-	 * until more WAL traffic arrives.
+	 * If the standby reports that it has fully replayed the WAL for at least
+	 * 10 seconds, then let's clear the lag times that were measured when it
+	 * last wrote/flushed/applied a WAL record.  This way we avoid displaying
+	 * stale lag data until more WAL traffic arrives.
 	 */
 	clearLagTimes = false;
 	if (applyPtr == sentPtr)
 	{
-		if (fullyAppliedLastTime)
+		if (!fullyAppliedLastTime)
+			fullyAppliedSince = now;
+		else if (now - fullyAppliedSince >= 10000000) /* 10 seconds */
 			clearLagTimes = true;
 		fullyAppliedLastTime = true;
 	}
@@ -1790,8 +1869,53 @@ ProcessStandbyReplyMessage(void)
 	 * standby.
 	 */
 	{
+		int			next_cr_state = -1;
 		WalSnd	   *walsnd = MyWalSnd;
 
+		/* Handle causal reads state machine. */
+		if (am_potential_causal_reads_standby && !am_cascading_walsender)
+		{
+			bool replay_lag_acceptable;
+
+			/* Check if the lag is acceptable (includes -1 for caught up). */
+			if (applyLag < causal_reads_max_replay_lag * 1000)
+				replay_lag_acceptable = true;
+			else
+				replay_lag_acceptable = false;
+
+			/* Figure out next if the state needs to change. */
+			switch (walsnd->causalReadsState)
+			{
+			case WALSNDCRSTATE_UNAVAILABLE:
+				/* Can we join? */
+				if (replay_lag_acceptable)
+					next_cr_state = WALSNDCRSTATE_JOINING;
+				break;
+			case WALSNDCRSTATE_JOINING:
+				/* Are we still applying fast enough? */
+				if (replay_lag_acceptable)
+				{
+					/* Have we reached the join point yet? */
+					if (applyPtr >= causal_reads_joining_until)
+						next_cr_state = WALSNDCRSTATE_AVAILABLE;
+				}
+				else
+					next_cr_state = WALSNDCRSTATE_UNAVAILABLE;
+				break;
+			case WALSNDCRSTATE_AVAILABLE:
+				/* Are we still applying fast enough? */
+				if (!replay_lag_acceptable)
+					next_cr_state = WALSNDCRSTATE_REVOKING;
+				break;
+			case WALSNDCRSTATE_REVOKING:
+				/* Has the revocation been acknowledged or timed out? */
+				if (replyTo == causal_reads_revoke_msgno ||
+					now >= walsnd->revokingUntil)
+					next_cr_state = WALSNDCRSTATE_UNAVAILABLE;
+				break;
+			}
+		}
+
 		SpinLockAcquire(&walsnd->mutex);
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
@@ -1802,11 +1926,53 @@ ProcessStandbyReplyMessage(void)
 			walsnd->flushLag = flushLag;
 		if (applyLag != -1 || clearLagTimes)
 			walsnd->applyLag = applyLag;
+		if (next_cr_state != -1)
+			walsnd->causalReadsState = next_cr_state;
+		if (next_cr_state == WALSNDCRSTATE_REVOKING)
+			walsnd->revokingUntil = causal_reads_last_lease;
 		SpinLockRelease(&walsnd->mutex);
+
+		/* Post shmem-update actions for causal read state transitions. */
+		switch (next_cr_state)
+		{
+		case WALSNDCRSTATE_JOINING:
+			/*
+			 * Now that we've started waiting for this standby, we need to
+			 * make sure that everything flushed before now has been applied
+			 * before we move to available and issue a lease.
+			 */
+			causal_reads_joining_until = GetFlushRecPtr();
+			ereport(LOG,
+					(errmsg("standby \"%s\" joining causal reads set...",
+							application_name)));
+			break;
+		case WALSNDCRSTATE_AVAILABLE:
+			/* Issue a new lease to the standby. */
+			WalSndKeepalive(false);
+			ereport(LOG,
+					(errmsg("standby \"%s\" is available for causal reads",
+							application_name)));
+			break;
+		case WALSNDCRSTATE_REVOKING:
+			/* Revoke the standby's lease, and note the message number. */
+			causal_reads_revoke_msgno = WalSndKeepalive(true);
+			ereport(LOG,
+					(errmsg("revoking causal reads lease for standby \"%s\"...",
+							application_name)));
+			break;
+		case WALSNDCRSTATE_UNAVAILABLE:
+			ereport(LOG,
+					(errmsg("standby \"%s\" is no longer available for causal reads",
+							application_name)));
+			break;
+		default:
+			/* No change. */
+			break;
+		}
 	}
 
 	if (!am_cascading_walsender)
-		SyncRepReleaseWaiters();
+		SyncRepReleaseWaiters(MyWalSnd->causalReadsState >= WALSNDCRSTATE_JOINING);
 
 	/*
 	 * Advance our local xmin horizon when the client confirmed a flush.
@@ -1996,33 +2162,51 @@ ProcessStandbyHSFeedbackMessage(void)
  * If wal_sender_timeout is enabled we want to wake up in time to send
  * keepalives and to abort the connection if wal_sender_timeout has been
  * reached.
+ *
+ * But if causal_reads_timeout is enabled, we override that and send
+ * keepalives at a constant rate to replace expiring leases.
  */
 static long
 WalSndComputeSleeptime(TimestampTz now)
 {
 	long		sleeptime = 10000;	/* 10 s */
 
-	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
+	if ((wal_sender_timeout > 0 && last_reply_timestamp > 0) ||
+		am_potential_causal_reads_standby)
 	{
 		TimestampTz wakeup_time;
 		long		sec_to_timeout;
 		int			microsec_to_timeout;
 
-		/*
-		 * At the latest stop sleeping once wal_sender_timeout has been
-		 * reached.
-		 */
-		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout);
-
-		/*
-		 * If no ping has been sent yet, wakeup when it's time to do so.
-		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
-		 * the timeout passed without a response.
-		 */
-		if (!waiting_for_ping_response)
+		if (am_potential_causal_reads_standby)
+		{
+			/*
+			 * We need to keep replacing leases before they expire.  We'll do
+			 * that halfway through the lease time according to our clock, to
+			 * allow for the standby's clock to be ahead of the primary's by
+			 * 25% of causal_reads_lease_time.
+			 */
+			wakeup_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp,
+													  causal_reads_lease_time / 2);
+		}
+		else
+		{
+			/*
+			 * At the latest stop sleeping once wal_sender_timeout has been
+			 * reached.
+			 */
 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-													  wal_sender_timeout / 2);
+													  wal_sender_timeout);
+
+			/*
+			 * If no ping has been sent yet, wakeup when it's time to do so.
+			 * WalSndKeepaliveIfNecessary() wants to send a keepalive once
+			 * half of the timeout passed without a response.
+			 */
+			if (!waiting_for_ping_response)
+				wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+														  wal_sender_timeout / 2);
+		}
 
 		/* Compute relative time until wakeup. */
 		TimestampDifference(now, wakeup_time,
@@ -2038,20 +2222,33 @@ WalSndComputeSleeptime(TimestampTz now)
 /*
  * Check whether there have been responses by the client within
  * wal_sender_timeout and shutdown if not.
+ *
+ * If causal_reads_timeout is configured we override that, so that
+ * unresponsive standbys are detected sooner.
  */
 static void
 WalSndCheckTimeOut(TimestampTz now)
 {
 	TimestampTz timeout;
+	int allowed_time;
 
 	/* don't bail out if we're doing something that doesn't require timeouts */
 	if (last_reply_timestamp <= 0)
 		return;
 
-	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout);
+	/*
+	 * If a causal_reads support is configured, we use causal_reads_lease_time
+	 * instead of wal_sender_timeout, to limit the time before an unresponsive
+	 * causal reads standby is dropped.
+	 */
+	if (am_potential_causal_reads_standby)
+		allowed_time = causal_reads_lease_time;
+	else
+		allowed_time = wal_sender_timeout;
 
-	if (wal_sender_timeout > 0 && now >= timeout)
+	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  allowed_time);
+	if (allowed_time > 0 && now >= timeout)
 	{
 		/*
 		 * Since typically expiration of replication timeout means
@@ -2079,6 +2276,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 	/* Report to pgstat that this process is running */
 	pgstat_report_activity(STATE_RUNNING, NULL);
 
+	/* Check if we are managing potential causal_reads standby. */
+	am_potential_causal_reads_standby = CausalReadsPotentialStandby();
+
 	/*
 	 * Loop until we reach the end of this timeline or the client requests to
 	 * stop streaming.
@@ -2243,6 +2443,7 @@ InitWalSenderSlot(void)
 			walsnd->flushLag = -1;
 			walsnd->applyLag = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
+			walsnd->causalReadsState = WALSNDCRSTATE_UNAVAILABLE;
 			walsnd->latch = &MyProc->procLatch;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
@@ -3125,6 +3326,27 @@ WalSndGetStateString(WalSndState state)
 	return "UNKNOWN";
 }
 
+/*
+ * Return a string constant representing the causal reads state. This is used
+ * in system views, and should *not* be translated.
+ */
+static const char *
+WalSndGetCausalReadsStateString(WalSndCausalReadsState causal_reads_state)
+{
+	switch (causal_reads_state)
+	{
+	case WALSNDCRSTATE_UNAVAILABLE:
+		return "unavailable";
+	case WALSNDCRSTATE_JOINING:
+		return "joining";
+	case WALSNDCRSTATE_AVAILABLE:
+		return "available";
+	case WALSNDCRSTATE_REVOKING:
+		return "revoking";
+	}
+	return "UNKNOWN";
+}
+
 static Interval *
 offset_to_interval(TimeOffset offset)
 {
@@ -3144,7 +3366,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	11
+#define PG_STAT_GET_WAL_SENDERS_COLS	12
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -3197,6 +3419,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		TimeOffset	applyLag;
 		int			priority;
 		WalSndState state;
+		WalSndCausalReadsState causalReadsState;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3206,6 +3429,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		SpinLockAcquire(&walsnd->mutex);
 		sentPtr = walsnd->sentPtr;
 		state = walsnd->state;
+		causalReadsState = walsnd->causalReadsState;
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
@@ -3288,6 +3512,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
 				values[10] = CStringGetTextDatum("potential");
+
+			values[11] =
+				CStringGetTextDatum(WalSndGetCausalReadsStateString(causalReadsState));
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3303,21 +3530,69 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
   * This function is used to send a keepalive message to standby.
   * If requestReply is set, sets a flag in the message requesting the standby
   * to send a message back to us, for heartbeat purposes.
+  * Return the serial number of the message that was sent.
   */
-static void
+static int64
 WalSndKeepalive(bool requestReply)
 {
+	TimestampTz causal_reads_lease;
+	TimestampTz now;
+
+	static int64 message_number = 0;
+
 	elog(DEBUG2, "sending replication keepalive");
 
+	/* Grant a causal reads lease if appropriate. */
+	now = GetCurrentTimestamp();
+	if (MyWalSnd->causalReadsState != WALSNDCRSTATE_AVAILABLE)
+	{
+		/* No lease granted, and any earlier lease is revoked. */
+		causal_reads_lease = 0;
+	}
+	else
+	{
+		/*
+		 * Since this timestamp is being sent to the standby where it will be
+		 * compared against a time generated by the standby's system clock, we
+		 * must consider clock skew.  We use 25% of the lease time as max
+		 * clock skew, and we subtract that from the time we send with the
+		 * following reasoning:
+		 *
+		 * 1.  If the standby's clock is slow (ie behind the primary's) by up
+		 * to that much, then by subtracting this amount will make sure the
+		 * lease doesn't survive past that time according to the primary's
+		 * clock.
+		 *
+		 * 2.  If the standby's clock is fast (ie ahead of the primary's) by
+		 * up to that much, then by subtracting this amount there won't be any
+		 * gaps between leases, since leases are reissued every time 50% of
+		 * the lease time elapses (see WalSndKeepaliveIfNecessary and
+		 * WalSndComputeSleepTime).
+		 */
+		int max_clock_skew = causal_reads_lease_time / 4;
+
+		/* Compute and remember the expiry time of the lease we're granting. */
+		causal_reads_last_lease =
+		TimestampTzPlusMilliseconds(now, causal_reads_lease_time);
+		/* Adjust the version we send for clock skew. */
+		causal_reads_lease =
+			TimestampTzPlusMilliseconds(causal_reads_last_lease,
+										-max_clock_skew);
+	}
+
 	/* construct the message... */
 	resetStringInfo(&output_message);
 	pq_sendbyte(&output_message, 'k');
+	pq_sendint64(&output_message, ++message_number);
 	pq_sendint64(&output_message, sentPtr);
-	pq_sendint64(&output_message, GetCurrentTimestamp());
+	pq_sendint64(&output_message, now);
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
+	pq_sendint64(&output_message, causal_reads_lease);
 
 	/* ... and send it wrapped in CopyData */
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
+
+	return message_number;
 }
 
 /*
@@ -3332,23 +3607,35 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	 * Don't send keepalive messages if timeouts are globally disabled or
 	 * we're doing something not partaking in timeouts.
 	 */
-	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
-		return;
-
-	if (waiting_for_ping_response)
-		return;
+	if (!am_potential_causal_reads_standby)
+	{
+		if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
+			return;
+		if (waiting_for_ping_response)
+			return;
+	}
 
 	/*
 	 * If half of wal_sender_timeout has lapsed without receiving any reply
 	 * from the standby, send a keep-alive message to the standby requesting
 	 * an immediate reply.
+	 *
+	 * If causal_reads_max_replay_lag has been configured, use
+	 * causal_reads_lease_time to control keepalive intervals rather than
+	 * wal_sender_timeout, so that we can keep replacing leases at the right
+	 * frequency.
 	 */
-	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-											wal_sender_timeout / 2);
+	if (am_potential_causal_reads_standby)
+		ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp,
+												causal_reads_lease_time / 2);
+	else
+		ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+												wal_sender_timeout / 2);
 	if (now >= ping_time)
 	{
 		WalSndKeepalive(true);
 		waiting_for_ping_response = true;
+		last_keepalive_timestamp = now;
 
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt
index 4f354717628..89e49e2c42e 100644
--- a/src/backend/utils/errcodes.txt
+++ b/src/backend/utils/errcodes.txt
@@ -307,6 +307,7 @@ Section: Class 40 - Transaction Rollback
 40001    E    ERRCODE_T_R_SERIALIZATION_FAILURE                              serialization_failure
 40003    E    ERRCODE_T_R_STATEMENT_COMPLETION_UNKNOWN                       statement_completion_unknown
 40P01    E    ERRCODE_T_R_DEADLOCK_DETECTED                                  deadlock_detected
+40P02    E    ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE                         causal_reads_not_available
 
 Section: Class 42 - Syntax Error or Access Rule Violation
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 82e54c084b8..d746e6eb0bf 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1647,6 +1647,16 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"causal_reads", PGC_USERSET, REPLICATION_STANDBY,
+		 gettext_noop("Enables causal reads."),
+		 NULL
+		},
+		&causal_reads,
+		false,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"syslog_sequence_numbers", PGC_SIGHUP, LOGGING_WHERE,
 			gettext_noop("Add sequence number to syslog messages to avoid duplicate suppression."),
 			NULL
@@ -2885,6 +2895,28 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"causal_reads_max_replay_lag", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("Sets the maximum allowed replay lag before causal reads standbys are no longer available."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&causal_reads_max_replay_lag,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"causal_reads_lease_time", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("Sets the duration of read leases used to implement causal reads."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&causal_reads_lease_time,
+		5000, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
@@ -3563,7 +3595,18 @@ static struct config_string ConfigureNamesString[] =
 		},
 		&SyncRepStandbyNames,
 		"",
-		check_synchronous_standby_names, assign_synchronous_standby_names, NULL
+		check_standby_names, assign_synchronous_standby_names, NULL
+	},
+
+	{
+		{"causal_reads_standby_names", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("List of names of potential causal reads standbys."),
+			NULL,
+			GUC_LIST_INPUT
+		},
+		&causal_reads_standby_names,
+		"*",
+		check_standby_names, NULL, NULL
 	},
 
 	{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2b1ebb797ec..91cdc65f218 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -250,6 +250,18 @@
 				# from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
 
+#causal_reads_max_replay_lag = 0s	# maximum replication delay to tolerate from
+					# standbys before dropping them from the set of
+					# available causal reads peers; 0 to disable
+					# causal  reads
+
+#causal_reads_lease_time = 5s		# how long individual leases granted to causal
+					# reads standbys should last; should be 4 times
+					# the max possible clock skew
+
+#causal_reads_standy_names = '*'	# standby servers that can potentially become
+					# available for causal reads; '*' = all
+
 # - Standby Servers -
 
 # These settings are ignored on a master server.
@@ -279,6 +291,14 @@
 #max_logical_replication_workers = 4	# taken from max_worker_processes
 #max_sync_workers_per_subscription = 2	# taken from max_logical_replication_workers
 
+# - All Servers -
+
+#causal_reads = off			# "on" in any pair of consecutive
+					# transactions guarantees that the second
+					# can see the first (even if the second
+					# is run on a standby), or will raise an
+					# error to report that the standby is
+					# unavailable for causal reads
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 6369be78a31..9ed1aa66042 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -54,6 +54,8 @@
 #include "catalog/catalog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
+#include "replication/syncrep.h"
+#include "replication/walreceiver.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -332,6 +334,16 @@ GetTransactionSnapshot(void)
 				 "cannot take query snapshot during a parallel operation");
 
 		/*
+		 * In causal_reads mode on a standby, check if we have definitely
+		 * applied WAL for any COMMIT that returned successfully on the
+		 * primary.
+		 */
+		if (causal_reads && RecoveryInProgress() && !WalRcvCausalReadsAvailable())
+			ereport(ERROR,
+					(errcode(ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE),
+					 errmsg("standby is not available for causal reads")));
+
+		/*
 		 * In transaction-snapshot mode, the first snapshot must live until
 		 * end of xact regardless of what the caller does with it, so we must
 		 * make a copy of it rather than returning CurrentSnapshotData
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6811a55e764..02eaf97247f 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -117,7 +117,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
 	static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
 	static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
 
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 1 + 8];
 	int			len = 0;
 
 	/*
@@ -150,6 +150,8 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
 	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 	len += 1;
+	fe_sendint64(-1, &replybuf[len]);	/* replyTo */
+	len += 8;
 
 	startpos = output_written_lsn;
 	last_written_lsn = output_written_lsn;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 15932c60b5a..501ecc849d1 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -325,7 +325,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 static bool
 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
 {
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 1 + 8];
 	int			len = 0;
 
 	replybuf[len] = 'r';
@@ -343,6 +343,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
 	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 	len += 1;
+	fe_sendint64(-1, &replybuf[len]);	/* replyTo */
+	len += 8;
 
 	if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
 	{
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1191b4ab1bd..bd00f374f53 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2832,7 +2832,7 @@ DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3318 (  pg_stat_get_progress_info			  PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
 DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,causal_reads_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6bffe63ad6b..8f383c128ae 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -801,6 +801,7 @@ typedef enum
 	WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC,
 	WAIT_EVENT_BGWORKER_STARTUP,
 	WAIT_EVENT_BTREE_PAGE,
+	WAIT_EVENT_CAUSAL_READS_APPLY,
 	WAIT_EVENT_EXECUTE_GATHER,
 	WAIT_EVENT_MQ_INTERNAL,
 	WAIT_EVENT_MQ_PUT_MESSAGE,
@@ -824,6 +825,7 @@ typedef enum
 typedef enum
 {
 	WAIT_EVENT_BASE_BACKUP_THROTTLE = PG_WAIT_TIMEOUT,
+	WAIT_EVENT_CAUSAL_READS_REVOKE,
 	WAIT_EVENT_PG_SLEEP,
 	WAIT_EVENT_RECOVERY_APPLY_DELAY
 } WaitEventTimeout;
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ceafe2cbea1..3d8c254ad46 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -15,6 +15,7 @@
 
 #include "access/xlogdefs.h"
 #include "utils/guc.h"
+#include "utils/timestamp.h"
 
 #define SyncRepRequested() \
 	(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
@@ -24,8 +25,9 @@
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
 #define SYNC_REP_WAIT_APPLY		2
+#define SYNC_REP_WAIT_CAUSAL_READS	3
 
-#define NUM_SYNC_REP_WAIT_MODE	3
+#define NUM_SYNC_REP_WAIT_MODE	4
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
@@ -36,6 +38,12 @@
 #define SYNC_REP_PRIORITY		0
 #define SYNC_REP_QUORUM		1
 
+/* GUC variables */
+extern int causal_reads_max_replay_lag;
+extern int causal_reads_lease_time;
+extern bool causal_reads;
+extern char *causal_reads_standby_names;
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -71,7 +79,7 @@ extern void SyncRepCleanupAtProcExit(void);
 
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
-extern void SyncRepReleaseWaiters(void);
+extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining);
 
 /* called by wal sender and user backend */
 extern List *SyncRepGetSyncStandbys(bool *am_sync);
@@ -79,8 +87,15 @@ extern List *SyncRepGetSyncStandbys(bool *am_sync);
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
+/* called by user backend (xact.c) */
+extern void CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN);
+
+/* called by wal sender */
+extern void CausalReadsRevokeLease(TimestampTz lease_expiry_time);
+extern bool CausalReadsPotentialStandby(void);
+
 /* GUC infrastructure */
-extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern bool check_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_standby_names(const char *newval, void *extra);
 extern void assign_synchronous_commit(int newval, void *extra);
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c8652dbd489..6eb188c88c1 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -83,6 +83,13 @@ typedef struct
 	TimeLineID	receivedTLI;
 
 	/*
+	 * causalReadsLease is the time until which the primary has authorized
+	 * this standby to consider itself available for causal_reads mode, or 0
+	 * for not authorized.
+	 */
+	TimestampTz causalReadsLease;
+
+	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
 	 * receivedUpto before the last flush to disk.  Startup process can use
@@ -298,4 +305,6 @@ extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
 
+extern bool WalRcvCausalReadsAvailable(void);
+
 #endif							/* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 0aa80d5c3e2..b5b4d392f8c 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -28,6 +28,14 @@ typedef enum WalSndState
 	WALSNDSTATE_STOPPING
 } WalSndState;
 
+typedef enum WalSndCausalReadsState
+{
+	WALSNDCRSTATE_UNAVAILABLE = 0,
+	WALSNDCRSTATE_JOINING,
+	WALSNDCRSTATE_AVAILABLE,
+	WALSNDCRSTATE_REVOKING
+} WalSndCausalReadsState;
+
 /*
  * Each walsender has a WalSnd struct in shared memory.
  */
@@ -53,6 +61,10 @@ typedef struct WalSnd
 	TimeOffset	flushLag;
 	TimeOffset	applyLag;
 
+	/* Causal reads state for this walsender. */
+	WalSndCausalReadsState causalReadsState;
+	TimestampTz revokingUntil;
+
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
 
@@ -94,6 +106,14 @@ typedef struct
 	 */
 	bool		sync_standbys_defined;
 
+	/*
+	 * Until when must commits in causal_reads stall?  This is used to wait
+	 * for causal reads leases to expire when a walsender exists uncleanly,
+	 * and we must stall causal reads commits until we're sure that the remote
+	 * server's lease has expired.
+	 */
+	TimestampTz	revokingUntil;
+
 	WalSnd		walsnds[FLEXIBLE_ARRAY_MEMBER];
 } WalSndCtlData;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2e42b9ec05f..1f5801bc450 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1859,9 +1859,10 @@ pg_stat_replication| SELECT s.pid,
     w.flush_lag,
     w.replay_lag,
     w.sync_priority,
-    w.sync_state
+    w.sync_state,
+    w.causal_reads_state
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, causal_reads_state) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
