diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 59cb053..07cd629 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -44,7 +44,9 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
+#include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
+#include "nodes/plannodes.h"
 #include "optimizer/clauses.h"
 #include "optimizer/var.h"
 #include "parser/parsetree.h"
@@ -60,7 +62,8 @@
 typedef struct foreign_glob_cxt
 {
 	PlannerInfo *root;			/* global planner state */
-	RelOptInfo *foreignrel;		/* the foreign relation we are planning for */
+	RelOptInfo *outerrel;		/* the foreign relation, or outer child */
+	RelOptInfo *innerrel;		/* inner child, only set for join */
 } foreign_glob_cxt;
 
 /*
@@ -86,9 +89,12 @@ typedef struct foreign_loc_cxt
 typedef struct deparse_expr_cxt
 {
 	PlannerInfo *root;			/* global planner state */
-	RelOptInfo *foreignrel;		/* the foreign relation we are planning for */
+	RelOptInfo *outerrel;		/* the foreign relation, or outer child */
+	RelOptInfo *innerrel;		/* inner child, only set for join */
 	StringInfo	buf;			/* output buffer to append to */
 	List	  **params_list;	/* exprs that will become remote Params */
+	ForeignScan *outerplan;		/* outer child's ForeignScan node */
+	ForeignScan *innerplan;		/* inner child's ForeignScan node */
 } deparse_expr_cxt;
 
 /*
@@ -160,7 +166,7 @@ classifyConditions(PlannerInfo *root,
 	{
 		RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
 
-		if (is_foreign_expr(root, baserel, ri->clause))
+		if (is_foreign_expr(root, baserel, NULL, ri->clause))
 			*remote_conds = lappend(*remote_conds, ri);
 		else
 			*local_conds = lappend(*local_conds, ri);
@@ -172,7 +178,8 @@ classifyConditions(PlannerInfo *root,
  */
 bool
 is_foreign_expr(PlannerInfo *root,
-				RelOptInfo *baserel,
+				RelOptInfo *outerrel,
+				RelOptInfo *innerrel,
 				Expr *expr)
 {
 	foreign_glob_cxt glob_cxt;
@@ -183,7 +190,8 @@ is_foreign_expr(PlannerInfo *root,
 	 * remotely.
 	 */
 	glob_cxt.root = root;
-	glob_cxt.foreignrel = baserel;
+	glob_cxt.outerrel = outerrel;
+	glob_cxt.innerrel = innerrel;
 	loc_cxt.collation = InvalidOid;
 	loc_cxt.state = FDW_COLLATE_NONE;
 	if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt))
@@ -250,7 +258,7 @@ foreign_expr_walker(Node *node,
 				 * Param's collation, ie it's not safe for it to have a
 				 * non-default collation.
 				 */
-				if (var->varno == glob_cxt->foreignrel->relid &&
+				if (var->varno == glob_cxt->outerrel->relid &&
 					var->varlevelsup == 0)
 				{
 					/* Var belongs to foreign table */
@@ -743,18 +751,22 @@ deparseTargetList(StringInfo buf,
 		if (attr->attisdropped)
 			continue;
 
+		if (!first)
+			appendStringInfoString(buf, ", ");
+		first = false;
+
 		if (have_wholerow ||
 			bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
 						  attrs_used))
 		{
-			if (!first)
-				appendStringInfoString(buf, ", ");
-			first = false;
 
 			deparseColumnRef(buf, rtindex, i, root);
 
-			*retrieved_attrs = lappend_int(*retrieved_attrs, i);
 		}
+		else
+			appendStringInfoString(buf, "NULL");
+
+		*retrieved_attrs = lappend_int(*retrieved_attrs, i);
 	}
 
 	/*
@@ -794,12 +806,15 @@ deparseTargetList(StringInfo buf,
  * so Params and other-relation Vars should be replaced by dummy values.
  */
 void
-appendWhereClause(StringInfo buf,
-				  PlannerInfo *root,
-				  RelOptInfo *baserel,
-				  List *exprs,
-				  bool is_first,
-				  List **params)
+appendConditions(StringInfo buf,
+				 PlannerInfo *root,
+				 RelOptInfo *outerrel,
+				 RelOptInfo *innerrel,
+				 ForeignScan *outerplan,
+				 ForeignScan *innerplan,
+				 List *exprs,
+				 const char *prefix,
+				 List **params)
 {
 	deparse_expr_cxt context;
 	int			nestlevel;
@@ -810,9 +825,12 @@ appendWhereClause(StringInfo buf,
 
 	/* Set up context struct for recursion */
 	context.root = root;
-	context.foreignrel = baserel;
+	context.outerrel = outerrel;
+	context.innerrel = innerrel;
 	context.buf = buf;
 	context.params_list = params;
+	context.outerplan = outerplan;
+	context.innerplan = innerplan;
 
 	/* Make sure any constants in the exprs are printed portably */
 	nestlevel = set_transmission_modes();
@@ -822,22 +840,149 @@ appendWhereClause(StringInfo buf,
 		RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
 
 		/* Connect expressions with "AND" and parenthesize each condition. */
-		if (is_first)
-			appendStringInfoString(buf, " WHERE ");
-		else
-			appendStringInfoString(buf, " AND ");
+		if (prefix)
+			appendStringInfo(buf, "%s", prefix);
 
 		appendStringInfoChar(buf, '(');
 		deparseExpr(ri->clause, &context);
 		appendStringInfoChar(buf, ')');
 
-		is_first = false;
+		prefix= " AND ";
 	}
 
 	reset_transmission_modes(nestlevel);
 }
 
 /*
+ * Deparse given Var into buf.
+ */
+static TargetEntry *
+deparseJoinVar(StringInfo buf, Var *var, Path *path_o, Path *path_i, List *tl_o, List *tl_i)
+{
+	List	   *targetlist;
+	const char *side;
+	ListCell   *lc2;
+	TargetEntry *tle = NULL;
+	int			j;
+
+	/* Find var from outer/inner subtree */
+	if (bms_is_member(var->varno, path_o->parent->relids))
+	{
+		targetlist = tl_o;
+		side = "l";
+	}
+	else if (bms_is_member(var->varno, path_i->parent->relids))
+	{
+		targetlist = tl_i;
+		side = "r";
+	}
+
+	j = 0;
+	foreach(lc2, targetlist)
+	{
+		TargetEntry *childtle = (TargetEntry *) lfirst(lc2);
+
+		if (equal(childtle->expr, var))
+		{
+			tle = copyObject(childtle);
+			break;
+		}
+		j++;
+	}
+	Assert(tle);
+
+	appendStringInfo(buf, "%s.a_%d", side, j);
+
+	return tle;
+}
+
+/*
+ * Construct a SELECT statement which contains join clause.
+ *
+ * We also create an TargetEntry List of the columns being retrieved, which is
+ * returned to *fdw_ps_tlist.
+ *
+ * path_o, tl_o, sql_o are respectively path, targetlist, and remote query
+ * statement of the outer child relation.  postfix _i means those for the inner
+ * child relation.  jointype and restrictlist are information of join method.
+ * fdw_ps_tlist is output parameter to pass target list of the pseudo scan to
+ * caller.
+ */
+void
+deparseJoinSql(StringInfo sql,
+			   PlannerInfo *root,
+			   RelOptInfo *baserel,
+			   Path *path_o,
+			   Path *path_i,
+			   ForeignScan *plan_o,
+			   ForeignScan *plan_i,
+			   const char *sql_o,
+			   const char *sql_i,
+			   JoinType jointype,
+			   List *restrictlist,
+			   List **fdw_ps_tlist)
+{
+	StringInfoData selbuf;		/* buffer for SELECT clause */
+	StringInfoData abuf_o;		/* buffer for column alias list of outer */
+	StringInfoData abuf_i;		/* buffer for column alias list of inner */
+	int			i;
+	ListCell   *lc;
+	const char *jointype_str;
+
+	jointype_str = jointype == JOIN_INNER ? "INNER" :
+				   jointype == JOIN_LEFT ? "LEFT" :
+				   jointype == JOIN_RIGHT ? "RIGHT" :
+				   jointype == JOIN_FULL ? "FULL" : "";
+
+	/* print SELECT clause of the join scan */
+	/* XXX: should extend deparseTargetList()? */
+	initStringInfo(&selbuf);
+	i = 0;
+	foreach(lc, baserel->reltargetlist)
+	{
+		Var		   *var = (Var *) lfirst(lc);
+		TargetEntry *tle;
+
+		if (i > 0)
+			appendStringInfoString(&selbuf, ", ");
+		deparseJoinVar(&selbuf, var, path_o, path_i,
+					   plan_o->scan.plan.targetlist,
+					   plan_i->scan.plan.targetlist);
+
+		tle = makeTargetEntry((Expr *) copyObject(var),
+							  i + 1, pstrdup(""), false);
+		if (fdw_ps_tlist)
+			*fdw_ps_tlist = lappend(*fdw_ps_tlist, copyObject(tle));
+
+		i++;
+	}
+
+	/* Deparse column alias portion of subquery in FROM clause. */
+	initStringInfo(&abuf_o);
+	initStringInfo(&abuf_i);
+	for (i = 0; i < list_length(plan_o->scan.plan.targetlist); i++)
+	{
+		if (i > 0)
+			appendStringInfoString(&abuf_o, ", ");
+		appendStringInfo(&abuf_o, "a_%d", i);
+	}
+	for (i = 0; i < list_length(plan_i->scan.plan.targetlist); i++)
+	{
+		if (i > 0)
+			appendStringInfoString(&abuf_i, ", ");
+		appendStringInfo(&abuf_i, "a_%d", i);
+	}
+
+	/* Construct SELECT statement */
+	appendStringInfo(sql, "SELECT %s FROM", selbuf.data);
+	appendStringInfo(sql, " (%s) l (%s) %s JOIN (%s) r (%s) ",
+					 sql_o, abuf_o.data, jointype_str, sql_i, abuf_i.data);
+	/* Append ON clause */
+	appendConditions(sql, root, path_o->parent, path_i->parent, plan_o, plan_i,
+					 restrictlist, " ON ", NULL);
+}
+
+/*
  * deparse remote INSERT statement
  *
  * The statement text is appended to buf, and we also create an integer List
@@ -1261,6 +1406,8 @@ deparseExpr(Expr *node, deparse_expr_cxt *context)
 /*
  * Deparse given Var node into context->buf.
  *
+ * If context has valid innerrel, this is invoked for a join conditions.
+ *
  * If the Var belongs to the foreign relation, just print its remote name.
  * Otherwise, it's effectively a Param (and will in fact be a Param at
  * run time).  Handle it the same way we handle plain Params --- see
@@ -1271,39 +1418,50 @@ deparseVar(Var *node, deparse_expr_cxt *context)
 {
 	StringInfo	buf = context->buf;
 
-	if (node->varno == context->foreignrel->relid &&
-		node->varlevelsup == 0)
+	if (context->innerrel != NULL)
 	{
-		/* Var belongs to foreign table */
-		deparseColumnRef(buf, node->varno, node->varattno, context->root);
+		deparseJoinVar(context->buf, node,
+					   context->outerrel->cheapest_total_path,
+					   context->innerrel->cheapest_total_path,
+					   context->outerplan->scan.plan.targetlist,
+					   context->innerplan->scan.plan.targetlist);
 	}
 	else
 	{
-		/* Treat like a Param */
-		if (context->params_list)
+		if (node->varno == context->outerrel->relid &&
+			node->varlevelsup == 0)
 		{
-			int			pindex = 0;
-			ListCell   *lc;
-
-			/* find its index in params_list */
-			foreach(lc, *context->params_list)
+			/* Var belongs to foreign table */
+			deparseColumnRef(buf, node->varno, node->varattno, context->root);
+		}
+		else
+		{
+			/* Treat like a Param */
+			if (context->params_list)
 			{
-				pindex++;
-				if (equal(node, (Node *) lfirst(lc)))
-					break;
+				int			pindex = 0;
+				ListCell   *lc;
+
+				/* find its index in params_list */
+				foreach(lc, *context->params_list)
+				{
+					pindex++;
+					if (equal(node, (Node *) lfirst(lc)))
+						break;
+				}
+				if (lc == NULL)
+				{
+					/* not in list, so add it */
+					pindex++;
+					*context->params_list = lappend(*context->params_list, node);
+				}
+
+				printRemoteParam(pindex, node->vartype, node->vartypmod, context);
 			}
-			if (lc == NULL)
+			else
 			{
-				/* not in list, so add it */
-				pindex++;
-				*context->params_list = lappend(*context->params_list, node);
+				printRemotePlaceholder(node->vartype, node->vartypmod, context);
 			}
-
-			printRemoteParam(pindex, node->vartype, node->vartypmod, context);
-		}
-		else
-		{
-			printRemotePlaceholder(node->vartype, node->vartypmod, context);
 		}
 	}
 }
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 583cce7..92d9b1f 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -656,16 +656,16 @@ SELECT * FROM ft2 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft1 WHERE c1 < 5));
 -- simple join
 PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2;
 EXPLAIN (VERBOSE, COSTS false) EXECUTE st1(1, 2);
-                             QUERY PLAN                             
---------------------------------------------------------------------
+                                                  QUERY PLAN                                                  
+--------------------------------------------------------------------------------------------------------------
  Nested Loop
    Output: t1.c3, t2.c3
    ->  Foreign Scan on public.ft1 t1
          Output: t1.c3
-         Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = 1))
+         Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" = 1))
    ->  Foreign Scan on public.ft2 t2
          Output: t2.c3
-         Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = 2))
+         Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" = 2))
 (8 rows)
 
 EXECUTE st1(1, 1);
@@ -683,8 +683,8 @@ EXECUTE st1(101, 101);
 -- subquery using stable function (can't be sent to remote)
 PREPARE st2(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND date(c4) = '1970-01-17'::date) ORDER BY c1;
 EXPLAIN (VERBOSE, COSTS false) EXECUTE st2(10, 20);
-                                                QUERY PLAN                                                
-----------------------------------------------------------------------------------------------------------
+                                                       QUERY PLAN                                                        
+-------------------------------------------------------------------------------------------------------------------------
  Sort
    Output: t1.c1, t1.c2, t1.c3, t1.c4, t1.c5, t1.c6, t1.c7, t1.c8
    Sort Key: t1.c1
@@ -699,7 +699,7 @@ EXPLAIN (VERBOSE, COSTS false) EXECUTE st2(10, 20);
                ->  Foreign Scan on public.ft2 t2
                      Output: t2.c3
                      Filter: (date(t2.c4) = '01-17-1970'::date)
-                     Remote SQL: SELECT c3, c4 FROM "S 1"."T 1" WHERE (("C 1" > 10))
+                     Remote SQL: SELECT NULL, NULL, c3, c4, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" > 10))
 (15 rows)
 
 EXECUTE st2(10, 20);
@@ -717,8 +717,8 @@ EXECUTE st2(101, 121);
 -- subquery using immutable function (can be sent to remote)
 PREPARE st3(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND date(c5) = '1970-01-17'::date) ORDER BY c1;
 EXPLAIN (VERBOSE, COSTS false) EXECUTE st3(10, 20);
-                                                      QUERY PLAN                                                       
------------------------------------------------------------------------------------------------------------------------
+                                                                           QUERY PLAN                                                                            
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------
  Sort
    Output: t1.c1, t1.c2, t1.c3, t1.c4, t1.c5, t1.c6, t1.c7, t1.c8
    Sort Key: t1.c1
@@ -732,7 +732,7 @@ EXPLAIN (VERBOSE, COSTS false) EXECUTE st3(10, 20);
                Output: t2.c3
                ->  Foreign Scan on public.ft2 t2
                      Output: t2.c3
-                     Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" > 10)) AND ((date(c5) = '1970-01-17'::date))
+                     Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" > 10)) AND ((date(c5) = '1970-01-17'::date))
 (14 rows)
 
 EXECUTE st3(10, 20);
@@ -1085,7 +1085,7 @@ INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
                Output: ((ft2_1.c1 + 1000)), ((ft2_1.c2 + 100)), ((ft2_1.c3 || ft2_1.c3))
                ->  Foreign Scan on public.ft2 ft2_1
                      Output: (ft2_1.c1 + 1000), (ft2_1.c2 + 100), (ft2_1.c3 || ft2_1.c3)
-                     Remote SQL: SELECT "C 1", c2, c3 FROM "S 1"."T 1"
+                     Remote SQL: SELECT "C 1", c2, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1"
 (9 rows)
 
 INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
@@ -1219,7 +1219,7 @@ UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
          Hash Cond: (ft2.c2 = ft1.c1)
          ->  Foreign Scan on public.ft2
                Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c8, ft2.ctid
-               Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c8, ctid FROM "S 1"."T 1" FOR UPDATE
+               Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, NULL, c8, ctid FROM "S 1"."T 1" FOR UPDATE
          ->  Hash
                Output: ft1.*, ft1.c1
                ->  Foreign Scan on public.ft1
@@ -1231,14 +1231,14 @@ UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
   FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
 EXPLAIN (verbose, costs off)
   DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
-                                       QUERY PLAN                                       
-----------------------------------------------------------------------------------------
+                                                               QUERY PLAN                                                               
+----------------------------------------------------------------------------------------------------------------------------------------
  Delete on public.ft2
    Output: c1, c4
-   Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c4
+   Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", NULL, NULL, c4, NULL, NULL, NULL, NULL
    ->  Foreign Scan on public.ft2
          Output: ctid
-         Remote SQL: SELECT ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) FOR UPDATE
+         Remote SQL: SELECT NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) FOR UPDATE
 (6 rows)
 
 DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
@@ -1360,7 +1360,7 @@ DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
          Hash Cond: (ft2.c2 = ft1.c1)
          ->  Foreign Scan on public.ft2
                Output: ft2.ctid, ft2.c2
-               Remote SQL: SELECT c2, ctid FROM "S 1"."T 1" FOR UPDATE
+               Remote SQL: SELECT NULL, c2, NULL, NULL, NULL, NULL, NULL, NULL, ctid FROM "S 1"."T 1" FOR UPDATE
          ->  Hash
                Output: ft1.*, ft1.c1
                ->  Foreign Scan on public.ft1
@@ -2594,12 +2594,12 @@ select c2, count(*) from "S 1"."T 1" where c2 < 500 group by 1 order by 1;
 -- Consistent check constraints provide consistent results
 ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2positive CHECK (c2 >= 0);
 EXPLAIN (VERBOSE, COSTS false) SELECT count(*) FROM ft1 WHERE c2 < 0;
-                            QUERY PLAN                             
--------------------------------------------------------------------
+                                                 QUERY PLAN                                                  
+-------------------------------------------------------------------------------------------------------------
  Aggregate
    Output: count(*)
    ->  Foreign Scan on public.ft1
-         Remote SQL: SELECT NULL FROM "S 1"."T 1" WHERE ((c2 < 0))
+         Remote SQL: SELECT NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE ((c2 < 0))
 (4 rows)
 
 SELECT count(*) FROM ft1 WHERE c2 < 0;
@@ -2638,12 +2638,12 @@ ALTER FOREIGN TABLE ft1 DROP CONSTRAINT ft1_c2positive;
 -- But inconsistent check constraints provide inconsistent results
 ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2negative CHECK (c2 < 0);
 EXPLAIN (VERBOSE, COSTS false) SELECT count(*) FROM ft1 WHERE c2 >= 0;
-                             QUERY PLAN                             
---------------------------------------------------------------------
+                                                  QUERY PLAN                                                  
+--------------------------------------------------------------------------------------------------------------
  Aggregate
    Output: count(*)
    ->  Foreign Scan on public.ft1
-         Remote SQL: SELECT NULL FROM "S 1"."T 1" WHERE ((c2 >= 0))
+         Remote SQL: SELECT NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE ((c2 >= 0))
 (4 rows)
 
 SELECT count(*) FROM ft1 WHERE c2 >= 0;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 63f0577..16df515 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -48,7 +48,8 @@ PG_MODULE_MAGIC;
 
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
- * foreign table.  This information is collected by postgresGetForeignRelSize.
+ * foreign table or foreign join.  This information is collected by
+ * postgresGetForeignRelSize, or calculated from join source relations.
  */
 typedef struct PgFdwRelationInfo
 {
@@ -78,10 +79,30 @@ typedef struct PgFdwRelationInfo
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;			/* only set in use_remote_estimate mode */
+	Oid			checkAsUser;
 } PgFdwRelationInfo;
 
 /*
- * Indexes of FDW-private information stored in fdw_private lists.
+ * Indexes of FDW-private information stored in fdw_private of ForeignPath.
+ * We use fdw_private of a ForeighPath when the path represents a join which
+ * can be pushed down to remote side.
+ *
+ * 1) Outer child path node
+ * 2) Inner child path node
+ * 3) Join type number(as an Integer node)
+ * 4) RestrictInfo list of join conditions
+ */
+enum FdwPathPrivateIndex
+{
+	FdwPathPrivateOuterPath,
+	FdwPathPrivateInnerPath,
+	FdwPathPrivateJoinType,
+	FdwPathPrivateRestrictList,
+};
+
+/*
+ * Indexes of FDW-private information stored in fdw_private of ForeignScan of
+ * a simple foreign table scan for a SELECT statement.
  *
  * We store various information in ForeignScan.fdw_private to pass it from
  * planner to executor.  Currently we store:
@@ -98,7 +119,11 @@ enum FdwScanPrivateIndex
 	/* SQL statement to execute remotely (as a String node) */
 	FdwScanPrivateSelectSql,
 	/* Integer list of attribute numbers retrieved by the SELECT */
-	FdwScanPrivateRetrievedAttrs
+	FdwScanPrivateRetrievedAttrs,
+	/* Integer value of server for the scan */
+	FdwScanPrivateServerOid,
+	/* Integer value of checkAsUser for the scan */
+	FdwScanPrivatecheckAsUser,
 };
 
 /*
@@ -129,6 +154,7 @@ enum FdwModifyPrivateIndex
 typedef struct PgFdwScanState
 {
 	Relation	rel;			/* relcache entry for the foreign table */
+	TupleDesc	tupdesc;		/* tuple descriptor of the scan */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* extracted fdw_private data */
@@ -288,6 +314,15 @@ static bool postgresAnalyzeForeignTable(Relation relation,
 							BlockNumber *totalpages);
 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
 							Oid serverOid);
+static void postgresGetForeignJoinPath(PlannerInfo *root,
+						   RelOptInfo *joinrel,
+						   RelOptInfo *outerrel,
+						   RelOptInfo *innerrel,
+						   JoinType jointype,
+						   SpecialJoinInfo *sjinfo,
+						   SemiAntiJoinFactors *semifactors,
+						   List *restrictlisti,
+						   Relids extra_lateral_rels);
 
 /*
  * Helper functions
@@ -324,6 +359,7 @@ static void analyze_row_processor(PGresult *res, int row,
 static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   int row,
 						   Relation rel,
+						   TupleDesc tupdesc,
 						   AttInMetadata *attinmeta,
 						   List *retrieved_attrs,
 						   MemoryContext temp_context);
@@ -368,6 +404,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	routine->ImportForeignSchema = postgresImportForeignSchema;
 
+	/* Support functions for join push-down */
+	routine->GetForeignJoinPath = postgresGetForeignJoinPath;
+
 	PG_RETURN_POINTER(routine);
 }
 
@@ -385,6 +424,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
 {
 	PgFdwRelationInfo *fpinfo;
 	ListCell   *lc;
+	RangeTblEntry *rte;
 
 	/*
 	 * We use PgFdwRelationInfo to pass various information to subsequent
@@ -428,6 +468,13 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	}
 
 	/*
+	 * Retrieve RTE to obtain checkAsUser.  checkAsUser is used to determine
+	 * the user to use to obtain user mapping.
+	 */
+	rte = planner_rt_fetch(baserel->relid, root);
+	fpinfo->checkAsUser = rte->checkAsUser;
+
+	/*
 	 * If the table or the server is configured to use remote estimates,
 	 * identify which user to do remote access as during planning.  This
 	 * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
@@ -435,7 +482,6 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	 */
 	if (fpinfo->use_remote_estimate)
 	{
-		RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
 		Oid			userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
 
 		fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
@@ -596,7 +642,7 @@ postgresGetForeignPaths(PlannerInfo *root,
 			continue;
 
 		/* See if it is safe to send to remote */
-		if (!is_foreign_expr(root, baserel, rinfo->clause))
+		if (!is_foreign_expr(root, baserel, NULL, rinfo->clause))
 			continue;
 
 		/* Calculate required outer rels for the resulting path */
@@ -672,7 +718,8 @@ postgresGetForeignPaths(PlannerInfo *root,
 					continue;
 
 				/* See if it is safe to send to remote */
-				if (!is_foreign_expr(root, baserel, rinfo->clause))
+				if (!is_foreign_expr(root, baserel, NULL,
+									 rinfo->clause))
 					continue;
 
 				/* Calculate required outer rels for the resulting path */
@@ -752,6 +799,8 @@ postgresGetForeignPlan(PlannerInfo *root,
 	List	   *retrieved_attrs;
 	StringInfoData sql;
 	ListCell   *lc;
+	List	   *fdw_ps_tlist = NIL;
+	ForeignScan *scan;
 
 	/*
 	 * Separate the scan_clauses into those that can be executed remotely and
@@ -769,7 +818,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 	 * This code must match "extract_actual_clauses(scan_clauses, false)"
 	 * except for the additional decision about remote versus local execution.
 	 * Note however that we only strip the RestrictInfo nodes from the
-	 * local_exprs list, since appendWhereClause expects a list of
+	 * local_exprs list, since appendConditions expects a list of
 	 * RestrictInfos.
 	 */
 	foreach(lc, scan_clauses)
@@ -786,7 +835,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 			remote_conds = lappend(remote_conds, rinfo);
 		else if (list_member_ptr(fpinfo->local_conds, rinfo))
 			local_exprs = lappend(local_exprs, rinfo->clause);
-		else if (is_foreign_expr(root, baserel, rinfo->clause))
+		else if (is_foreign_expr(root, baserel, NULL, rinfo->clause))
 			remote_conds = lappend(remote_conds, rinfo);
 		else
 			local_exprs = lappend(local_exprs, rinfo->clause);
@@ -797,64 +846,123 @@ postgresGetForeignPlan(PlannerInfo *root,
 	 * expressions to be sent as parameters.
 	 */
 	initStringInfo(&sql);
-	deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
-					 &retrieved_attrs);
-	if (remote_conds)
-		appendWhereClause(&sql, root, baserel, remote_conds,
-						  true, &params_list);
-
-	/*
-	 * Add FOR UPDATE/SHARE if appropriate.  We apply locking during the
-	 * initial row fetch, rather than later on as is done for local tables.
-	 * The extra roundtrips involved in trying to duplicate the local
-	 * semantics exactly don't seem worthwhile (see also comments for
-	 * RowMarkType).
-	 *
-	 * Note: because we actually run the query as a cursor, this assumes that
-	 * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
-	 */
-	if (baserel->relid == root->parse->resultRelation &&
-		(root->parse->commandType == CMD_UPDATE ||
-		 root->parse->commandType == CMD_DELETE))
+	if (scan_relid > 0)
 	{
-		/* Relation is UPDATE/DELETE target, so use FOR UPDATE */
-		appendStringInfoString(&sql, " FOR UPDATE");
-	}
-	else
-	{
-		RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
+		deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
+						 &retrieved_attrs);
+		if (remote_conds)
+			appendConditions(&sql, root, baserel, NULL, NULL, NULL,
+							 remote_conds, " WHERE ", &params_list);
 
-		if (rc)
+		/*
+		 * Add FOR UPDATE/SHARE if appropriate.  We apply locking during the
+		 * initial row fetch, rather than later on as is done for local tables.
+		 * The extra roundtrips involved in trying to duplicate the local
+		 * semantics exactly don't seem worthwhile (see also comments for
+		 * RowMarkType).
+		 *
+		 * Note: because we actually run the query as a cursor, this assumes
+		 * that DECLARE CURSOR ... FOR UPDATE is supported, which it isn't
+		 * before 8.3.
+		 */
+		if (baserel->relid == root->parse->resultRelation &&
+			(root->parse->commandType == CMD_UPDATE ||
+			 root->parse->commandType == CMD_DELETE))
 		{
-			/*
-			 * Relation is specified as a FOR UPDATE/SHARE target, so handle
-			 * that.
-			 *
-			 * For now, just ignore any [NO] KEY specification, since (a) it's
-			 * not clear what that means for a remote table that we don't have
-			 * complete information about, and (b) it wouldn't work anyway on
-			 * older remote servers.  Likewise, we don't worry about NOWAIT.
-			 */
-			switch (rc->strength)
+			/* Relation is UPDATE/DELETE target, so use FOR UPDATE */
+			appendStringInfoString(&sql, " FOR UPDATE");
+		}
+		else
+		{
+			RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
+
+			if (rc)
 			{
-				case LCS_FORKEYSHARE:
-				case LCS_FORSHARE:
-					appendStringInfoString(&sql, " FOR SHARE");
-					break;
-				case LCS_FORNOKEYUPDATE:
-				case LCS_FORUPDATE:
-					appendStringInfoString(&sql, " FOR UPDATE");
-					break;
+				/*
+				 * Relation is specified as a FOR UPDATE/SHARE target, so handle
+				 * that.
+				 *
+				 * For now, just ignore any [NO] KEY specification, since (a)
+				 * it's not clear what that means for a remote table that we
+				 * don't have complete information about, and (b) it wouldn't
+				 * work anyway on older remote servers.  Likewise, we don't
+				 * worry about NOWAIT.
+				 */
+				switch (rc->strength)
+				{
+					case LCS_FORKEYSHARE:
+					case LCS_FORSHARE:
+						appendStringInfoString(&sql, " FOR SHARE");
+						break;
+					case LCS_FORNOKEYUPDATE:
+					case LCS_FORUPDATE:
+						appendStringInfoString(&sql, " FOR UPDATE");
+						break;
+				}
 			}
 		}
 	}
+	else
+	{
+		/* Join case */
+		Path	   *path_o;
+		Path	   *path_i;
+		const char *sql_o;
+		const char *sql_i;
+		ForeignScan *plan_o;
+		ForeignScan *plan_i;
+		JoinType	jointype;
+		List	   *restrictlist;
+		int			i;
+
+		/* 
+		 * Retrieve infomation from fdw_private.
+		 */
+		path_o = list_nth(best_path->fdw_private, FdwPathPrivateOuterPath);
+		path_i = list_nth(best_path->fdw_private, FdwPathPrivateInnerPath);
+		jointype = intVal(list_nth(best_path->fdw_private,
+								   FdwPathPrivateJoinType));
+		restrictlist = list_nth(best_path->fdw_private,
+								FdwPathPrivateRestrictList);
+
+		/*
+		 * Construct remote query from bottom to the top.  ForeignScan plan
+		 * node of underlying scans are node necessary for execute the plan
+		 * tree, but it is handy to construct remote query recursively.
+		 */
+		plan_o = (ForeignScan *) create_plan_recurse(root, path_o);
+		Assert(IsA(plan_o, ForeignScan));
+		sql_o = strVal(list_nth(plan_o->fdw_private, FdwScanPrivateSelectSql));
+
+		plan_i = (ForeignScan *) create_plan_recurse(root, path_i);
+		Assert(IsA(plan_i, ForeignScan));
+		sql_i = strVal(list_nth(plan_i->fdw_private, FdwScanPrivateSelectSql));
+
+		deparseJoinSql(&sql, root, baserel, path_o, path_i, plan_o, plan_i,
+					   sql_o, sql_i, jointype, restrictlist, &fdw_ps_tlist);
+		retrieved_attrs = NIL;
+		for (i = 0; i < list_length(fdw_ps_tlist); i++)
+			retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
+	}
 
 	/*
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwScanPrivateIndex, above.
 	 */
-	fdw_private = list_make2(makeString(sql.data),
-							 retrieved_attrs);
+	fdw_private = list_make2(makeString(sql.data), retrieved_attrs);
+
+	/*
+	 * In pseudo scan case such as join push-down, add OID of server and
+	 * checkAsUser as extra information.
+	 * XXX: passing serverid and checkAsUser might simplify code through
+	 * all cases, simple scans and join push-down.
+	 */ 
+	if (scan_relid == 0)
+	{
+		fdw_private = lappend(fdw_private,
+							  makeInteger(fpinfo->server->serverid));
+		fdw_private = lappend(fdw_private, makeInteger(fpinfo->checkAsUser));
+	}
 
 	/*
 	 * Create the ForeignScan node from target list, local filtering
@@ -864,11 +972,18 @@ postgresGetForeignPlan(PlannerInfo *root,
 	 * field of the finished plan node; we can't keep them in private state
 	 * because then they wouldn't be subject to later planner processing.
 	 */
-	return make_foreignscan(tlist,
+	scan = make_foreignscan(tlist,
 							local_exprs,
 							scan_relid,
 							params_list,
 							fdw_private);
+
+	/*
+	 * set fdw_ps_tlist to handle tuples generated by this scan.
+	 */
+	scan->fdw_ps_tlist = fdw_ps_tlist;
+
+	return scan;
 }
 
 /*
@@ -881,9 +996,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
 	EState	   *estate = node->ss.ps.state;
 	PgFdwScanState *fsstate;
-	RangeTblEntry *rte;
+	Oid			serverid;
 	Oid			userid;
-	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
 	int			numParams;
@@ -903,22 +1017,51 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	node->fdw_state = (void *) fsstate;
 
 	/*
-	 * Identify which user to do the remote access as.  This should match what
-	 * ExecCheckRTEPerms() does.
+	 * Initialize fsstate.
+	 *
+	 * These values should be determined.
+	 * - fsstate->rel, NULL if no actual relation
+	 * - serverid, OID of forign server to use for the scan
+	 * - userid, searching user mapping
 	 */
-	rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
-	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+	if (fsplan->scan.scanrelid > 0)
+	{
+		/* Simple foreign table scan */
+		RangeTblEntry *rte;
+		ForeignTable *table;
 
-	/* Get info about foreign table. */
-	fsstate->rel = node->ss.ss_currentRelation;
-	table = GetForeignTable(RelationGetRelid(fsstate->rel));
-	server = GetForeignServer(table->serverid);
-	user = GetUserMapping(userid, server->serverid);
+		/*
+		 * Identify which user to do the remote access as.  This should match
+		 * what ExecCheckRTEPerms() does.
+		 */
+		rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+		userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+		/* Get info about foreign table. */
+		fsstate->rel = node->ss.ss_currentRelation;
+		table = GetForeignTable(RelationGetRelid(fsstate->rel));
+		serverid = table->serverid;
+	}
+	else
+	{
+		Oid		checkAsUser;
+
+		/* Join */
+		fsstate->rel = NULL;	/* No actual relation to scan */
+
+		serverid = intVal(list_nth(fsplan->fdw_private,
+								   FdwScanPrivateServerOid));
+		checkAsUser = intVal(list_nth(fsplan->fdw_private,
+									  FdwScanPrivatecheckAsUser));
+		userid = checkAsUser ? checkAsUser : GetUserId();
+	}
 
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
+	server = GetForeignServer(serverid);
+	user = GetUserMapping(userid, server->serverid);
 	fsstate->conn = GetConnection(server, user, false);
 
 	/* Assign a unique ID for my cursor */
@@ -929,7 +1072,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
-											   FdwScanPrivateRetrievedAttrs);
+												 FdwScanPrivateRetrievedAttrs);
 
 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -944,7 +1087,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 											  ALLOCSET_SMALL_MAXSIZE);
 
 	/* Get info we'll need for input data conversion. */
-	fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
+	if (fsplan->scan.scanrelid > 0)
+		fsstate->tupdesc = RelationGetDescr(fsstate->rel);
+	else
+		fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
+	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
 
 	/* Prepare for output conversion of parameters used in remote query. */
 	numParams = list_length(fsplan->fdw_exprs);
@@ -1747,11 +1894,13 @@ estimate_path_cost_size(PlannerInfo *root,
 		deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
 						 &retrieved_attrs);
 		if (fpinfo->remote_conds)
-			appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
-							  true, NULL);
+			appendConditions(&sql, root, baserel, NULL, NULL, NULL,
+							 fpinfo->remote_conds, " WHERE ", NULL);
 		if (remote_join_conds)
-			appendWhereClause(&sql, root, baserel, remote_join_conds,
-							  (fpinfo->remote_conds == NIL), NULL);
+			appendConditions(&sql, root, baserel, NULL, NULL, NULL,
+							 remote_join_conds,
+							 fpinfo->remote_conds == NIL ? " WHERE " : " AND ",
+							 NULL);
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->server, fpinfo->user, false);
@@ -2052,6 +2201,7 @@ fetch_more_data(ForeignScanState *node)
 			fsstate->tuples[i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
+										   fsstate->tupdesc,
 										   fsstate->attinmeta,
 										   fsstate->retrieved_attrs,
 										   fsstate->temp_cxt);
@@ -2270,6 +2420,7 @@ store_returning_result(PgFdwModifyState *fmstate,
 
 		newtup = make_tuple_from_result_row(res, 0,
 											fmstate->rel,
+											NULL,
 											fmstate->attinmeta,
 											fmstate->retrieved_attrs,
 											fmstate->temp_cxt);
@@ -2562,6 +2713,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
 
 		astate->rows[pos] = make_tuple_from_result_row(res, row,
 													   astate->rel,
+													   NULL,
 													   astate->attinmeta,
 													 astate->retrieved_attrs,
 													   astate->temp_cxt);
@@ -2835,6 +2987,181 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 }
 
 /*
+ * Construct PgFdwRelationInfo from two join sources
+ */
+static PgFdwRelationInfo *
+merge_fpinfo(PgFdwRelationInfo *fpinfo_o,
+			 PgFdwRelationInfo *fpinfo_i,
+			 JoinType jointype)
+{
+	PgFdwRelationInfo *fpinfo;
+
+	fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
+	fpinfo->remote_conds = list_concat(copyObject(fpinfo_o->remote_conds),
+									   copyObject(fpinfo_i->remote_conds));
+	fpinfo->local_conds = list_concat(copyObject(fpinfo_o->local_conds),
+									  copyObject(fpinfo_i->local_conds));
+
+	fpinfo->attrs_used = NULL;		/* Use fdw_ps_tlist */
+	fpinfo->local_conds_cost.startup = fpinfo_o->local_conds_cost.startup +
+									   fpinfo_i->local_conds_cost.startup;
+	fpinfo->local_conds_cost.per_tuple = fpinfo_o->local_conds_cost.per_tuple +
+										 fpinfo_i->local_conds_cost.per_tuple;
+	fpinfo->local_conds_sel = fpinfo_o->local_conds_sel *
+							  fpinfo_i->local_conds_sel;
+	if (jointype == JOIN_INNER)
+		fpinfo->rows = Min(fpinfo_o->rows, fpinfo_i->rows);
+	else
+		fpinfo->rows = Max(fpinfo_o->rows, fpinfo_i->rows);
+	fpinfo->rows = Min(fpinfo_o->rows, fpinfo_i->rows);
+	/* XXX we should consider only columns in fdw_ps_tlist */
+	fpinfo->width = fpinfo_o->width + fpinfo_i->width;
+	/* XXX we should estimate better costs */
+
+	fpinfo->use_remote_estimate = false;	/* Never use in join case */
+	fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
+	fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
+
+	fpinfo->startup_cost = fpinfo->fdw_startup_cost;
+	fpinfo->total_cost =
+		fpinfo->startup_cost + fpinfo->fdw_tuple_cost * fpinfo->rows;
+
+	fpinfo->table = NULL;	/* always NULL in join case */
+	fpinfo->server = fpinfo_o->server;
+	fpinfo->user = fpinfo_o->user ? fpinfo_o->user : fpinfo_i->user;
+	/* checkAsuser must be identical */
+	fpinfo->checkAsUser = fpinfo_o->checkAsUser;
+
+	return fpinfo;
+}
+
+/*
+ * postgresGetForeignJoinPath
+ *		Add possible ForeignPath to joinrel.
+ *
+ * Joins satify conditions below can be pushed down to remote PostgreSQL server.
+ *
+ * 1) Join type is inner or outer
+ * 2) Join conditions consist of remote-safe expressions.
+ * 3) Join source relations don't have any local filter.
+ */
+static void
+postgresGetForeignJoinPath(PlannerInfo *root,
+						   RelOptInfo *joinrel,
+						   RelOptInfo *outerrel,
+						   RelOptInfo *innerrel,
+						   JoinType jointype,
+						   SpecialJoinInfo *sjinfo,
+						   SemiAntiJoinFactors *semifactors,
+						   List *restrictlist,
+						   Relids extra_lateral_rels)
+{
+	ForeignPath	   *joinpath;
+	ForeignPath	   *path_o = (ForeignPath *) outerrel->cheapest_total_path;
+	ForeignPath	   *path_i = (ForeignPath *) innerrel->cheapest_total_path;
+	PgFdwRelationInfo *fpinfo_o;
+	PgFdwRelationInfo *fpinfo_i;
+	PgFdwRelationInfo *fpinfo;
+	double			rows;
+	Cost			startup_cost;
+	Cost			total_cost;
+	ListCell	   *lc;
+	List		   *fdw_private;
+
+	/* Source relations should be ForeignPath. */
+	if (!IsA(path_o, ForeignPath) || !IsA(path_i, ForeignPath))
+		return;
+
+	/*
+	 * Skip considering reversed join combination.
+	 */
+	if (outerrel->relid < innerrel->relid)
+		return;
+
+	/*
+	 * Both relations in the join must belong to same server.
+	 */
+	fpinfo_o = path_o->path.parent->fdw_private;
+	fpinfo_i = path_i->path.parent->fdw_private;
+	if (fpinfo_o->server->serverid != fpinfo_i->server->serverid)
+		return;
+
+	/*
+	 * We support all outer joins in addition to inner join.
+	 */
+	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
+		jointype != JOIN_RIGHT && jointype != JOIN_FULL)
+		return;
+
+	/*
+	 * Note that CROSS JOIN (cartesian product) is transformed to JOIN_INNER
+	 * with empty restrictlist. Pushing down CROSS JOIN produces more result
+	 * than retrieving each tables separately, so we don't push down such joins.
+	 */
+	if (jointype == JOIN_INNER && restrictlist == NIL)
+		return;
+
+	/*
+	 * Neither source relation can have local conditions.  This can be relaxed
+	 * if the join is an inner join and local conditions don't contain volatile
+	 * function/operator, but as of now we leave it as future enhancement.
+	 */
+	if (fpinfo_o->local_conds != NULL || fpinfo_i->local_conds != NULL)
+		return;
+
+	/*
+	 * Join condition must be safe to push down.
+	 */
+	foreach(lc, restrictlist)
+	{
+		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
+
+		if (!is_foreign_expr(root, joinrel, NULL, rinfo->clause))
+			return;
+	}
+
+	/*
+	 * checkAsUser of source pathes should match.
+	 */
+	if (fpinfo_o->checkAsUser != fpinfo_i->checkAsUser)
+		return;
+
+	/* Here we know that this join can be pushed-down to remote side. */
+
+	/* Construct fpinfo for the join relation */
+	fpinfo = merge_fpinfo(fpinfo_o, fpinfo_i, jointype);
+	joinrel->fdw_private = fpinfo;
+
+	/* TODO determine cost and rows of the join. */
+	rows = fpinfo->rows;
+	startup_cost = fpinfo->startup_cost;
+	total_cost = fpinfo->total_cost;
+
+	fdw_private = list_make4(path_o,
+							 path_i,
+							 makeInteger(jointype),
+							 restrictlist);
+
+	/*
+	 * Create a new join path and add it to the joinrel which represents a join
+	 * between foreign tables.
+	 */
+	joinpath = create_foreignscan_path(root,
+									   joinrel,
+									   rows,
+									   startup_cost,
+									   total_cost,
+									   NIL,		/* no pathkeys */
+									   NULL,	/* no required_outer */
+									   fdw_private);
+
+	/* Add generated path into joinrel by add_path(). */
+	add_path(joinrel, (Path *) joinpath);
+
+	/* TODO consider parameterized paths */
+}
+
+/*
  * Create a tuple from the specified row of the PGresult.
  *
  * rel is the local representation of the foreign table, attinmeta is
@@ -2846,12 +3173,12 @@ static HeapTuple
 make_tuple_from_result_row(PGresult *res,
 						   int row,
 						   Relation rel,
+						   TupleDesc tupdesc,
 						   AttInMetadata *attinmeta,
 						   List *retrieved_attrs,
 						   MemoryContext temp_context)
 {
 	HeapTuple	tuple;
-	TupleDesc	tupdesc = RelationGetDescr(rel);
 	Datum	   *values;
 	bool	   *nulls;
 	ItemPointer ctid = NULL;
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..23a6b24 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,6 +16,7 @@
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "nodes/relation.h"
+#include "nodes/plannodes.h"
 #include "utils/relcache.h"
 
 #include "libpq-fe.h"
@@ -45,19 +46,35 @@ extern void classifyConditions(PlannerInfo *root,
 				   List **remote_conds,
 				   List **local_conds);
 extern bool is_foreign_expr(PlannerInfo *root,
-				RelOptInfo *baserel,
+				RelOptInfo *outerrel,
+				RelOptInfo *innerrel,
 				Expr *expr);
 extern void deparseSelectSql(StringInfo buf,
 				 PlannerInfo *root,
 				 RelOptInfo *baserel,
 				 Bitmapset *attrs_used,
 				 List **retrieved_attrs);
-extern void appendWhereClause(StringInfo buf,
+extern void appendConditions(StringInfo buf,
 				  PlannerInfo *root,
-				  RelOptInfo *baserel,
+				  RelOptInfo *outerrel,
+				  RelOptInfo *innerrel,
+				  ForeignScan *outerplan,
+				  ForeignScan *innerplan,
 				  List *exprs,
-				  bool is_first,
+				  const char *prefix,
 				  List **params);
+extern void deparseJoinSql(StringInfo sql,
+			   PlannerInfo *root,
+			   RelOptInfo *baserel,
+			   Path *path_o,
+			   Path *path_i,
+			   ForeignScan *plan_o,
+			   ForeignScan *plan_i,
+			   const char *sql_o,
+			   const char *sql_i,
+			   JoinType jointype,
+			   List *restrictlist,
+			   List **retrieved_attrs);
 extern void deparseInsertSql(StringInfo buf, PlannerInfo *root,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, List *returningList,
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 9261e7f..7abc8e2 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3071,6 +3071,20 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-foiregnjoin" xreflabel="enable_foiregnjoin">
+      <term><varname>enable_foiregnjoin</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_foiregnjoin</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of foreign-scan plan
+        types for joining foreign tables. The default is <literal>on</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-hashagg" xreflabel="enable_hashagg">
       <term><varname>enable_hashagg</varname> (<type>boolean</type>)
       <indexterm>
@@ -7687,6 +7701,7 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
          </entry>
          <entry>
           <literal>enable_bitmapscan = off</>,
+          <literal>enable_foreignjoin = off</>,
           <literal>enable_hashjoin = off</>,
           <literal>enable_indexscan = off</>,
           <literal>enable_mergejoin = off</>,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 78ef229..b1d2683 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -117,6 +117,7 @@ bool		enable_nestloop = true;
 bool		enable_material = true;
 bool		enable_mergejoin = true;
 bool		enable_hashjoin = true;
+bool		enable_foreignjoin = true;
 
 typedef struct
 {
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 1d7b9fa..cf45c55 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -17,6 +17,7 @@
 #include <math.h>
 
 #include "executor/executor.h"
+#include "foreign/fdwapi.h"
 #include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
@@ -264,17 +265,33 @@ add_paths_to_joinrel(PlannerInfo *root,
 							 param_source_rels, extra_lateral_rels);
 
 	/*
-	 * 5. Consider paths added by FDW drivers or custom-scan providers, in
-	 * addition to built-in paths.
-	 *
-	 * XXX - In case of FDW, we may be able to omit invocation if joinrel's
-	 * fdwhandler (set only if both relations are managed by same FDW server).
+	 * 5. Consider paths added by custom-scan providers, in addition to
+	 * built-in paths.
 	 */
 	if (set_join_pathlist_hook)
 		set_join_pathlist_hook(root, joinrel, outerrel, innerrel,
 							   restrictlist, jointype,
 							   sjinfo, &semifactors,
 							   param_source_rels, extra_lateral_rels);
+
+	/*
+	 * 6. Consider paths added by FDWs when both outer and inner relations are
+	 * managed by same foreign-data wrapper.  Matching of foreign server and/or
+	 * checkAsUser should be checked in GetForeignJoinPath by the FDW.
+	 */
+	if (enable_foreignjoin &&
+		joinrel->fdwroutine && joinrel->fdwroutine->GetForeignJoinPath)
+	{
+		joinrel->fdwroutine->GetForeignJoinPath(root,
+												joinrel,
+												outerrel,
+												innerrel,
+												jointype,
+												sjinfo,
+												&semifactors,
+												restrictlist,
+												extra_lateral_rels);
+	}
 }
 
 /*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 33720e8..9014a72 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3236,6 +3236,9 @@ set_plan_disabling_options(const char *arg, GucContext context, GucSource source
 		case 'h':				/* hashjoin */
 			tmp = "enable_hashjoin";
 			break;
+		case 'f':				/* foreignjoin */
+			tmp = "enable_foreignjoin";
+			break;
 	}
 	if (tmp)
 	{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index d84dba7..7c2343f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -875,6 +875,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_foreignjoin", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of foreign scan plans for foreign joins."),
+			NULL
+		},
+		&enable_foreignjoin,
+		true,
+		NULL, NULL, NULL
+	},
+	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
 			gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f8f9ce1..4fc4455 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -272,6 +272,7 @@
 # - Planner Method Configuration -
 
 #enable_bitmapscan = on
+#enable_foreignjoin = on
 #enable_hashagg = on
 #enable_hashjoin = on
 #enable_indexscan = on
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index b494ff2..d4ab71a 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -82,6 +82,16 @@ typedef void (*EndForeignModify_function) (EState *estate,
 
 typedef int (*IsForeignRelUpdatable_function) (Relation rel);
 
+typedef void (*GetForeignJoinPath_function ) (PlannerInfo *root,
+											  RelOptInfo *joinrel,
+											  RelOptInfo *outerrel,
+											  RelOptInfo *innerrel,
+											  JoinType jointype,
+											  SpecialJoinInfo *sjinfo,
+											  SemiAntiJoinFactors *semifactors,
+											  List *restrictlist,
+											  Relids extra_lateral_rels);
+
 typedef void (*ExplainForeignScan_function) (ForeignScanState *node,
 													struct ExplainState *es);
 
@@ -150,6 +160,10 @@ typedef struct FdwRoutine
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	ImportForeignSchema_function ImportForeignSchema;
+
+	/* Support functions for join push-down */
+	GetForeignJoinPath_function GetForeignJoinPath;
+
 } FdwRoutine;
 
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 9c2000b..e41c88a 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,7 @@ extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
 extern bool enable_hashjoin;
+extern bool enable_foreignjoin;
 extern int	constraint_exclusion;
 
 extern double clamp_row_est(double nrows);
diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out
index 7991e99..4e02062 100644
--- a/src/test/regress/expected/rangefuncs.out
+++ b/src/test/regress/expected/rangefuncs.out
@@ -2,6 +2,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
          name         | setting 
 ----------------------+---------
  enable_bitmapscan    | on
+ enable_foreignjoin   | on
  enable_hashagg       | on
  enable_hashjoin      | on
  enable_indexonlyscan | on
@@ -12,7 +13,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
  enable_seqscan       | on
  enable_sort          | on
  enable_tidscan       | on
-(11 rows)
+(12 rows)
 
 CREATE TABLE foo2(fooid int, f2 int);
 INSERT INTO foo2 VALUES(1, 11);
