diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
new file mode 100644
index 0b9e3e4..408e14d
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
*************** SELECT t1.c1 FROM ft1 t1 WHERE NOT EXIST
*** 1803,1841 ****
  
  -- CROSS JOIN, not pushed down
  EXPLAIN (VERBOSE, COSTS OFF)
! SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
!                              QUERY PLAN                              
! ---------------------------------------------------------------------
   Limit
!    Output: t1.c1, t2.c1
     ->  Sort
!          Output: t1.c1, t2.c1
!          Sort Key: t1.c1, t2.c1
           ->  Nested Loop
!                Output: t1.c1, t2.c1
                 ->  Foreign Scan on public.ft1 t1
!                      Output: t1.c1
!                      Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
                 ->  Materialize
!                      Output: t2.c1
                       ->  Foreign Scan on public.ft2 t2
!                            Output: t2.c1
!                            Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
  (15 rows)
  
! SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
!  c1 | c1  
! ----+-----
!   1 | 101
!   1 | 102
!   1 | 103
!   1 | 104
!   1 | 105
!   1 | 106
!   1 | 107
!   1 | 108
!   1 | 109
!   1 | 110
  (10 rows)
  
  -- different server, not pushed down. No result expected.
--- 1803,1841 ----
  
  -- CROSS JOIN, not pushed down
  EXPLAIN (VERBOSE, COSTS OFF)
! SELECT t1.c2, t2.c2 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c2, t2.c2 OFFSET 100 LIMIT 10;
!                             QUERY PLAN                            
! ------------------------------------------------------------------
   Limit
!    Output: t1.c2, t2.c2
     ->  Sort
!          Output: t1.c2, t2.c2
!          Sort Key: t1.c2, t2.c2
           ->  Nested Loop
!                Output: t1.c2, t2.c2
                 ->  Foreign Scan on public.ft1 t1
!                      Output: t1.c2
!                      Remote SQL: SELECT c2 FROM "S 1"."T 1"
                 ->  Materialize
!                      Output: t2.c2
                       ->  Foreign Scan on public.ft2 t2
!                            Output: t2.c2
!                            Remote SQL: SELECT c2 FROM "S 1"."T 1"
  (15 rows)
  
! SELECT t1.c2, t2.c2 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c2, t2.c2 OFFSET 100 LIMIT 10;
!  c2 | c2 
! ----+----
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
!   0 |  0
  (10 rows)
  
  -- different server, not pushed down. No result expected.
*************** select c2/2, sum(c2) * (c2/2) from ft1 g
*** 2377,2394 ****
  -- Aggregates in subquery are pushed down.
  explain (verbose, costs off)
  select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
!                                                 QUERY PLAN                                                
! ----------------------------------------------------------------------------------------------------------
   Aggregate
     Output: count(ft1.c2), sum(ft1.c2)
!    ->  Sort
           Output: ft1.c2, (sum(ft1.c1)), (sqrt((ft1.c1)::double precision))
           Sort Key: ft1.c2, (sum(ft1.c1))
!          ->  Foreign Scan
!                Output: ft1.c2, (sum(ft1.c1)), (sqrt((ft1.c1)::double precision))
!                Relations: Aggregate on (public.ft1)
!                Remote SQL: SELECT c2, sum("C 1"), sqrt("C 1") FROM "S 1"."T 1" GROUP BY c2, (sqrt("C 1"))
! (9 rows)
  
  select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
   count | sum  
--- 2377,2397 ----
  -- Aggregates in subquery are pushed down.
  explain (verbose, costs off)
  select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
!                                                         QUERY PLAN                                                        
! --------------------------------------------------------------------------------------------------------------------------
   Aggregate
     Output: count(ft1.c2), sum(ft1.c2)
!    ->  Incremental Sort
           Output: ft1.c2, (sum(ft1.c1)), (sqrt((ft1.c1)::double precision))
           Sort Key: ft1.c2, (sum(ft1.c1))
!          Presorted Key: ft1.c2
!          ->  GroupAggregate
!                Output: ft1.c2, sum(ft1.c1), (sqrt((ft1.c1)::double precision))
!                Group Key: ft1.c2, sqrt((ft1.c1)::double precision)
!                ->  Foreign Scan on public.ft1
!                      Output: ft1.c2, sqrt((ft1.c1)::double precision), ft1.c1
!                      Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" ORDER BY c2 ASC NULLS LAST, sqrt("C 1") ASC NULLS LAST
! (12 rows)
  
  select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
   count | sum  
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
new file mode 100644
index 56b01d0..a9f7111
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
*************** SELECT t1.c1 FROM ft1 t1 WHERE NOT EXIST
*** 462,469 ****
  SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
  -- CROSS JOIN, not pushed down
  EXPLAIN (VERBOSE, COSTS OFF)
! SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
! SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
  -- different server, not pushed down. No result expected.
  EXPLAIN (VERBOSE, COSTS OFF)
  SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
--- 462,469 ----
  SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
  -- CROSS JOIN, not pushed down
  EXPLAIN (VERBOSE, COSTS OFF)
! SELECT t1.c2, t2.c2 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c2, t2.c2 OFFSET 100 LIMIT 10;
! SELECT t1.c2, t2.c2 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c2, t2.c2 OFFSET 100 LIMIT 10;
  -- different server, not pushed down. No result expected.
  EXPLAIN (VERBOSE, COSTS OFF)
  SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
new file mode 100644
index 95afc2c..049d470
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
*************** ANY <replaceable class="parameter">num_s
*** 3524,3529 ****
--- 3524,3543 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-enable-incsort" xreflabel="enable_incsort">
+       <term><varname>enable_incsort</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary><varname>enable_incsort</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Enables or disables the query planner's use of incremental sort
+         steps. The default is <literal>on</>.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       <varlistentry id="guc-enable-indexscan" xreflabel="enable_indexscan">
        <term><varname>enable_indexscan</varname> (<type>boolean</type>)
        <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
new file mode 100644
index c9e0a3e..5020f5c
*** a/src/backend/commands/explain.c
--- b/src/backend/commands/explain.c
*************** static void show_grouping_set_keys(PlanS
*** 92,98 ****
  static void show_group_keys(GroupState *gstate, List *ancestors,
  				ExplainState *es);
  static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
! 					 int nkeys, AttrNumber *keycols,
  					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
  					 List *ancestors, ExplainState *es);
  static void show_sortorder_options(StringInfo buf, Node *sortexpr,
--- 92,98 ----
  static void show_group_keys(GroupState *gstate, List *ancestors,
  				ExplainState *es);
  static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
! 					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
  					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
  					 List *ancestors, ExplainState *es);
  static void show_sortorder_options(StringInfo buf, Node *sortexpr,
*************** ExplainNode(PlanState *planstate, List *
*** 974,979 ****
--- 974,982 ----
  		case T_Sort:
  			pname = sname = "Sort";
  			break;
+ 		case T_IncSort:
+ 			pname = sname = "Incremental Sort";
+ 			break;
  		case T_Group:
  			pname = sname = "Group";
  			break;
*************** ExplainNode(PlanState *planstate, List *
*** 1504,1509 ****
--- 1507,1513 ----
  										   planstate, es);
  			break;
  		case T_Sort:
+ 		case T_IncSort:
  			show_sort_keys(castNode(SortState, planstate), ancestors, es);
  			show_sort_info(castNode(SortState, planstate), es);
  			break;
*************** static void
*** 1832,1840 ****
  show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
  {
  	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
  
  	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
! 						 plan->numCols, plan->sortColIdx,
  						 plan->sortOperators, plan->collations,
  						 plan->nullsFirst,
  						 ancestors, es);
--- 1836,1850 ----
  show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
  {
  	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
+ 	int			skipCols;
+ 
+ 	if (IsA(plan, IncSort))
+ 		skipCols = ((IncSort *) plan)->skipCols;
+ 	else
+ 		skipCols = 0;
  
  	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
! 						 plan->numCols, skipCols, plan->sortColIdx,
  						 plan->sortOperators, plan->collations,
  						 plan->nullsFirst,
  						 ancestors, es);
*************** show_merge_append_keys(MergeAppendState 
*** 1850,1856 ****
  	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
  
  	show_sort_group_keys((PlanState *) mstate, "Sort Key",
! 						 plan->numCols, plan->sortColIdx,
  						 plan->sortOperators, plan->collations,
  						 plan->nullsFirst,
  						 ancestors, es);
--- 1860,1866 ----
  	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
  
  	show_sort_group_keys((PlanState *) mstate, "Sort Key",
! 						 plan->numCols, 0, plan->sortColIdx,
  						 plan->sortOperators, plan->collations,
  						 plan->nullsFirst,
  						 ancestors, es);
*************** show_agg_keys(AggState *astate, List *an
*** 1874,1880 ****
  			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
  		else
  			show_sort_group_keys(outerPlanState(astate), "Group Key",
! 								 plan->numCols, plan->grpColIdx,
  								 NULL, NULL, NULL,
  								 ancestors, es);
  
--- 1884,1890 ----
  			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
  		else
  			show_sort_group_keys(outerPlanState(astate), "Group Key",
! 								 plan->numCols, 0, plan->grpColIdx,
  								 NULL, NULL, NULL,
  								 ancestors, es);
  
*************** show_grouping_set_keys(PlanState *planst
*** 1930,1936 ****
  	if (sortnode)
  	{
  		show_sort_group_keys(planstate, "Sort Key",
! 							 sortnode->numCols, sortnode->sortColIdx,
  							 sortnode->sortOperators, sortnode->collations,
  							 sortnode->nullsFirst,
  							 ancestors, es);
--- 1940,1946 ----
  	if (sortnode)
  	{
  		show_sort_group_keys(planstate, "Sort Key",
! 							 sortnode->numCols, 0, sortnode->sortColIdx,
  							 sortnode->sortOperators, sortnode->collations,
  							 sortnode->nullsFirst,
  							 ancestors, es);
*************** show_group_keys(GroupState *gstate, List
*** 1987,1993 ****
  	/* The key columns refer to the tlist of the child plan */
  	ancestors = lcons(gstate, ancestors);
  	show_sort_group_keys(outerPlanState(gstate), "Group Key",
! 						 plan->numCols, plan->grpColIdx,
  						 NULL, NULL, NULL,
  						 ancestors, es);
  	ancestors = list_delete_first(ancestors);
--- 1997,2003 ----
  	/* The key columns refer to the tlist of the child plan */
  	ancestors = lcons(gstate, ancestors);
  	show_sort_group_keys(outerPlanState(gstate), "Group Key",
! 						 plan->numCols, 0, plan->grpColIdx,
  						 NULL, NULL, NULL,
  						 ancestors, es);
  	ancestors = list_delete_first(ancestors);
*************** show_group_keys(GroupState *gstate, List
*** 2000,2012 ****
   */
  static void
  show_sort_group_keys(PlanState *planstate, const char *qlabel,
! 					 int nkeys, AttrNumber *keycols,
  					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
  					 List *ancestors, ExplainState *es)
  {
  	Plan	   *plan = planstate->plan;
  	List	   *context;
  	List	   *result = NIL;
  	StringInfoData sortkeybuf;
  	bool		useprefix;
  	int			keyno;
--- 2010,2023 ----
   */
  static void
  show_sort_group_keys(PlanState *planstate, const char *qlabel,
! 					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
  					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
  					 List *ancestors, ExplainState *es)
  {
  	Plan	   *plan = planstate->plan;
  	List	   *context;
  	List	   *result = NIL;
+ 	List	   *resultPresorted = NIL;
  	StringInfoData sortkeybuf;
  	bool		useprefix;
  	int			keyno;
*************** show_sort_group_keys(PlanState *planstat
*** 2046,2054 ****
--- 2057,2069 ----
  								   nullsFirst[keyno]);
  		/* Emit one property-list item per sort key */
  		result = lappend(result, pstrdup(sortkeybuf.data));
+ 		if (keyno < nPresortedKeys)
+ 			resultPresorted = lappend(resultPresorted, exprstr);
  	}
  
  	ExplainPropertyList(qlabel, result, es);
+ 	if (nPresortedKeys > 0)
+ 		ExplainPropertyList("Presorted Key", resultPresorted, es);
  }
  
  /*
*************** show_sort_info(SortState *sortstate, Exp
*** 2195,2206 ****
--- 2210,2230 ----
  			appendStringInfoSpaces(es->str, es->indent * 2);
  			appendStringInfo(es->str, "Sort Method: %s  %s: %ldkB\n",
  							 sortMethod, spaceType, spaceUsed);
+ 			if (sortstate->skipKeys)
+ 			{
+ 				appendStringInfoSpaces(es->str, es->indent * 2);
+ 				appendStringInfo(es->str, "Sort groups: %ld\n",
+ 								 sortstate->groupsCount);
+ 			}
  		}
  		else
  		{
  			ExplainPropertyText("Sort Method", sortMethod, es);
  			ExplainPropertyLong("Sort Space Used", spaceUsed, es);
  			ExplainPropertyText("Sort Space Type", spaceType, es);
+ 			if (sortstate->skipKeys)
+ 				ExplainPropertyLong("Sort groups: %ld",
+ 									sortstate->groupsCount, es);
  		}
  	}
  }
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
new file mode 100644
index d380207..c6c3ab7
*** a/src/backend/executor/execAmi.c
--- b/src/backend/executor/execAmi.c
*************** ExecReScan(PlanState *node)
*** 235,240 ****
--- 235,241 ----
  			break;
  
  		case T_SortState:
+ 		case T_IncSortState:
  			ExecReScanSort((SortState *) node);
  			break;
  
*************** ExecSupportsBackwardScan(Plan *node)
*** 509,516 ****
--- 510,521 ----
  		case T_CteScan:
  		case T_Material:
  		case T_Sort:
+ 			/* these don't evaluate tlist */
  			return true;
  
+ 		case T_IncSort:
+ 			return false;
+ 
  		case T_LockRows:
  		case T_Limit:
  			return ExecSupportsBackwardScan(outerPlan(node));
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
new file mode 100644
index 0dd95c6..3cc4b77
*** a/src/backend/executor/execProcnode.c
--- b/src/backend/executor/execProcnode.c
*************** ExecInitNode(Plan *node, EState *estate,
*** 291,296 ****
--- 291,297 ----
  			break;
  
  		case T_Sort:
+ 		case T_IncSort:
  			result = (PlanState *) ExecInitSort((Sort *) node,
  												estate, eflags);
  			break;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
new file mode 100644
index aa08152..aa4d8e2
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
*************** initialize_phase(AggState *aggstate, int
*** 559,564 ****
--- 559,565 ----
  												  sortnode->collations,
  												  sortnode->nullsFirst,
  												  work_mem,
+ 												  false,
  												  false);
  	}
  
*************** initialize_aggregate(AggState *aggstate,
*** 637,643 ****
  									 pertrans->sortOperators,
  									 pertrans->sortCollations,
  									 pertrans->sortNullsFirst,
! 									 work_mem, false);
  	}
  
  	/*
--- 638,644 ----
  									 pertrans->sortOperators,
  									 pertrans->sortCollations,
  									 pertrans->sortNullsFirst,
! 									 work_mem, false, false);
  	}
  
  	/*
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
new file mode 100644
index 591a31a..28272be
*** a/src/backend/executor/nodeSort.c
--- b/src/backend/executor/nodeSort.c
***************
*** 15,25 ****
--- 15,123 ----
  
  #include "postgres.h"
  
+ #include "access/htup_details.h"
  #include "executor/execdebug.h"
  #include "executor/nodeSort.h"
  #include "miscadmin.h"
+ #include "utils/lsyscache.h"
  #include "utils/tuplesort.h"
  
+ /*
+  * Check if first "skipCols" sort values are equal.
+  */
+ static bool
+ cmpSortSkipCols(SortState *node, TupleTableSlot *a, TupleTableSlot *b)
+ {
+ 	int n, i;
+ 
+ 	Assert(IsA(node->ss.ps.plan, IncSort));
+ 
+ 	n = ((IncSort *) node->ss.ps.plan)->skipCols;
+ 
+ 	for (i = 0; i < n; i++)
+ 	{
+ 		Datum datumA, datumB, result;
+ 		bool isnullA, isnullB;
+ 		AttrNumber attno = node->skipKeys[i].attno;
+ 		SkipKeyData *key;
+ 
+ 		datumA = slot_getattr(a, attno, &isnullA);
+ 		datumB = slot_getattr(b, attno, &isnullB);
+ 
+ 		/* Special case for NULL-vs-NULL, else use standard comparison */
+ 		if (isnullA || isnullB)
+ 		{
+ 			if (isnullA == isnullB)
+ 				continue;
+ 			else
+ 				return false;
+ 		}
+ 
+ 		key = &node->skipKeys[i];
+ 
+ 		key->fcinfo.arg[0] = datumA;
+ 		key->fcinfo.arg[1] = datumB;
+ 
+ 		/* just for paranoia's sake, we reset isnull each time */
+ 		key->fcinfo.isnull = false;
+ 
+ 		result = FunctionCallInvoke(&key->fcinfo);
+ 
+ 		/* Check for null result, since caller is clearly not expecting one */
+ 		if (key->fcinfo.isnull)
+ 			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
+ 
+ 		if (!DatumGetBool(result))
+ 			return false;
+ 	}
+ 	return true;
+ }
+ 
+ /*
+  * Prepare information for skipKeys comparison.
+  */
+ static void
+ prepareSkipCols(SortState *node)
+ {
+ 	IncSort	   *plannode;
+ 	int			skipCols,
+ 				i;
+ 
+ 	plannode = (IncSort *) node->ss.ps.plan;
+ 	Assert(IsA(plannode, IncSort));
+ 	skipCols = plannode->skipCols;
+ 
+ 	node->skipKeys = (SkipKeyData *)palloc(skipCols * sizeof(SkipKeyData));
+ 
+ 	for (i = 0; i < skipCols; i++)
+ 	{
+ 		Oid equalityOp, equalityFunc;
+ 		SkipKeyData *key;
+ 
+ 		key = &node->skipKeys[i];
+ 		key->attno = plannode->sort.sortColIdx[i];
+ 
+ 		equalityOp = get_equality_op_for_ordering_op(
+ 										plannode->sort.sortOperators[i], NULL);
+ 		if (!OidIsValid(equalityOp))
+ 			elog(ERROR, "missing equality operator for ordering operator %u",
+ 					plannode->sort.sortOperators[i]);
+ 
+ 		equalityFunc = get_opcode(equalityOp);
+ 		if (!OidIsValid(equalityFunc))
+ 			elog(ERROR, "missing function for operator %u", equalityOp);
+ 
+ 		/* Lookup the comparison function */
+ 		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
+ 
+ 		/* We can initialize the callinfo just once and re-use it */
+ 		InitFunctionCallInfoData(key->fcinfo, &key->flinfo, 2,
+ 								plannode->sort.collations[i], NULL, NULL);
+ 		key->fcinfo.argnull[0] = false;
+ 		key->fcinfo.argnull[1] = false;
+ 	}
+ }
+ 
  
  /* ----------------------------------------------------------------
   *		ExecSort
*************** ExecSort(SortState *node)
*** 42,47 ****
--- 140,155 ----
  	ScanDirection dir;
  	Tuplesortstate *tuplesortstate;
  	TupleTableSlot *slot;
+ 	Sort	   *plannode = (Sort *) node->ss.ps.plan;
+ 	PlanState  *outerNode;
+ 	int			skipCols;
+ 	TupleDesc	tupDesc;
+ 	int64		nTuples = 0;
+ 
+ 	if (IsA(plannode, IncSort))
+ 		skipCols = ((IncSort *) plannode)->skipCols;
+ 	else
+ 		skipCols = 0;
  
  	/*
  	 * get state info from node
*************** ExecSort(SortState *node)
*** 54,87 ****
  	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
  
  	/*
  	 * If first time through, read all tuples from outer plan and pass them to
  	 * tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
  	 */
  
! 	if (!node->sort_Done)
! 	{
! 		Sort	   *plannode = (Sort *) node->ss.ps.plan;
! 		PlanState  *outerNode;
! 		TupleDesc	tupDesc;
! 
! 		SO1_printf("ExecSort: %s\n",
! 				   "sorting subplan");
  
! 		/*
! 		 * Want to scan subplan in the forward direction while creating the
! 		 * sorted data.
! 		 */
! 		estate->es_direction = ForwardScanDirection;
  
! 		/*
! 		 * Initialize tuplesort module.
! 		 */
! 		SO1_printf("ExecSort: %s\n",
! 				   "calling tuplesort_begin");
  
! 		outerNode = outerPlanState(node);
! 		tupDesc = ExecGetResultType(outerNode);
  
  		tuplesortstate = tuplesort_begin_heap(tupDesc,
  											  plannode->numCols,
  											  plannode->sortColIdx,
--- 162,204 ----
  	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
  
  	/*
+ 	 * Return next tuple from sorted set if any.
+ 	 */
+ 	if (node->sort_Done)
+ 	{
+ 		slot = node->ss.ps.ps_ResultTupleSlot;
+ 		if (tuplesort_gettupleslot(tuplesortstate,
+ 									  ScanDirectionIsForward(dir),
+ 									  slot, NULL) || node->finished)
+ 			return slot;
+ 	}
+ 
+ 	/*
  	 * If first time through, read all tuples from outer plan and pass them to
  	 * tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
  	 */
  
! 	SO1_printf("ExecSort: %s\n",
! 			   "sorting subplan");
  
! 	/*
! 	 * Want to scan subplan in the forward direction while creating the
! 	 * sorted data.
! 	 */
! 	estate->es_direction = ForwardScanDirection;
  
! 	/*
! 	 * Initialize tuplesort module.
! 	 */
! 	SO1_printf("ExecSort: %s\n",
! 			   "calling tuplesort_begin");
  
! 	outerNode = outerPlanState(node);
! 	tupDesc = ExecGetResultType(outerNode);
  
+ 	if (skipCols == 0)
+ 	{
+ 		/* Regular case: no skip cols */
  		tuplesortstate = tuplesort_begin_heap(tupDesc,
  											  plannode->numCols,
  											  plannode->sortColIdx,
*************** ExecSort(SortState *node)
*** 89,132 ****
  											  plannode->collations,
  											  plannode->nullsFirst,
  											  work_mem,
! 											  node->randomAccess);
! 		if (node->bounded)
! 			tuplesort_set_bound(tuplesortstate, node->bound);
  		node->tuplesortstate = (void *) tuplesortstate;
  
! 		/*
! 		 * Scan the subplan and feed all the tuples to tuplesort.
! 		 */
  
! 		for (;;)
  		{
! 			slot = ExecProcNode(outerNode);
  
  			if (TupIsNull(slot))
  				break;
! 
  			tuplesort_puttupleslot(tuplesortstate, slot);
  		}
  
! 		/*
! 		 * Complete the sort.
! 		 */
! 		tuplesort_performsort(tuplesortstate);
  
! 		/*
! 		 * restore to user specified direction
! 		 */
! 		estate->es_direction = dir;
  
! 		/*
! 		 * finally set the sorted flag to true
! 		 */
! 		node->sort_Done = true;
! 		node->bounded_Done = node->bounded;
! 		node->bound_Done = node->bound;
! 		SO1_printf("ExecSort: %s\n", "sorting done");
  	}
  
  	SO1_printf("ExecSort: %s\n",
  			   "retrieving tuple from tuplesort");
  
--- 206,355 ----
  											  plannode->collations,
  											  plannode->nullsFirst,
  											  work_mem,
! 											  node->randomAccess,
! 											  false);
  		node->tuplesortstate = (void *) tuplesortstate;
  
! 		if (node->bounded)
! 			tuplesort_set_bound(tuplesortstate, node->bound);
! 	}
! 	else
! 	{
! 		/* Incremental sort case */
! 		if (node->tuplesortstate == NULL)
! 		{
! 			/*
! 			 * We are going to process the first group of presorted data.
! 			 * Initialize support structures for cmpSortSkipCols - already
! 			 * sorted columns.
! 			 */
! 			prepareSkipCols(node);
  
! 			/*
! 			 * Only pass on remaining columns that are unsorted.  Skip
! 			 * abbreviated keys usage for incremental sort.  We unlikely will
! 			 * have huge groups with incremental sort.  Therefore usage of
! 			 * abbreviated keys would be likely a waste of time.
! 			 */
! 			tuplesortstate = tuplesort_begin_heap(
! 										tupDesc,
! 										plannode->numCols - skipCols,
! 										&(plannode->sortColIdx[skipCols]),
! 										&(plannode->sortOperators[skipCols]),
! 										&(plannode->collations[skipCols]),
! 										&(plannode->nullsFirst[skipCols]),
! 										work_mem,
! 										false,
! 										true);
! 			node->tuplesortstate = (void *) tuplesortstate;
! 			node->groupsCount++;
! 		}
! 		else
  		{
! 			/* Next group of presorted data */
! 			tuplesort_reset((Tuplesortstate *) node->tuplesortstate);
! 			node->groupsCount++;
! 		}
  
+ 		/* Calculate remaining bound for bounded sort */
+ 		if (node->bounded)
+ 			tuplesort_set_bound(tuplesortstate, node->bound - node->bound_Done);
+ 	}
+ 
+ 	/*
+ 	 * Put next group of tuples where skipCols sort values are equal to
+ 	 * tuplesort.
+ 	 */
+ 	for (;;)
+ 	{
+ 		slot = ExecProcNode(outerNode);
+ 
+ 		if (skipCols == 0)
+ 		{
+ 			/* Regular sort case: put all tuples to the tuplesort */
  			if (TupIsNull(slot))
+ 			{
+ 				node->finished = true;
  				break;
! 			}
  			tuplesort_puttupleslot(tuplesortstate, slot);
+ 			nTuples++;
  		}
+ 		else
+ 		{
+ 			/* Incremental sort case: put group of presorted data to the tuplesort */
+ 			if (node->prevSlot->tts_isempty)
+ 			{
+ 				/* First tuple */
+ 				if (TupIsNull(slot))
+ 				{
+ 					node->finished = true;
+ 					break;
+ 				}
+ 				else
+ 				{
+ 					ExecCopySlot(node->prevSlot, slot);
+ 				}
+ 			}
+ 			else
+ 			{
+ 				/* Put previous tuple into tuplesort */
+ 				tuplesort_puttupleslot(tuplesortstate, node->prevSlot);
+ 				nTuples++;
  
! 				if (TupIsNull(slot))
! 				{
! 					node->finished = true;
! 					break;
! 				}
! 				else
! 				{
! 					bool	cmp;
! 					cmp = cmpSortSkipCols(node, node->prevSlot, slot);
  
! 					/* Replace previous tuple with current one */
! 					ExecCopySlot(node->prevSlot, slot);
  
! 					/*
! 					 * When skipCols are not equal then group of presorted data
! 					 * is finished
! 					 */
! 					if (!cmp)
! 						break;
! 				}
! 			}
! 		}
  	}
  
+ 	/*
+ 	 * Complete the sort.
+ 	 */
+ 	tuplesort_performsort(tuplesortstate);
+ 
+ 	/*
+ 	 * restore to user specified direction
+ 	 */
+ 	estate->es_direction = dir;
+ 
+ 	/*
+ 	 * finally set the sorted flag to true
+ 	 */
+ 	node->sort_Done = true;
+ 	node->bounded_Done = node->bounded;
+ 
+ 	/*
+ 	 * Adjust bound_Done with number of tuples we've actually sorted.
+ 	 */
+ 	if (node->bounded)
+ 	{
+ 		if (node->finished)
+ 			node->bound_Done = node->bound;
+ 		else
+ 			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
+ 	}
+ 
+ 	SO1_printf("ExecSort: %s\n", "sorting done");
+ 
  	SO1_printf("ExecSort: %s\n",
  			   "retrieving tuple from tuplesort");
  
*************** ExecInitSort(Sort *node, EState *estate,
*** 157,162 ****
--- 380,394 ----
  			   "initializing sort node");
  
  	/*
+ 	 * skipCols can't be used with either EXEC_FLAG_REWIND, EXEC_FLAG_BACKWARD
+ 	 * or EXEC_FLAG_MARK, because we hold only current bucket in
+ 	 * tuplesortstate.
+ 	 */
+ 	Assert(IsA(node, Sort) || (eflags & (EXEC_FLAG_REWIND |
+ 										 EXEC_FLAG_BACKWARD |
+ 										 EXEC_FLAG_MARK)) == 0);
+ 
+ 	/*
  	 * create state structure
  	 */
  	sortstate = makeNode(SortState);
*************** ExecInitSort(Sort *node, EState *estate,
*** 174,180 ****
--- 406,417 ----
  
  	sortstate->bounded = false;
  	sortstate->sort_Done = false;
+ 	sortstate->finished = false;
  	sortstate->tuplesortstate = NULL;
+ 	sortstate->prevSlot = NULL;
+ 	sortstate->bound_Done = 0;
+ 	sortstate->groupsCount = 0;
+ 	sortstate->skipKeys = NULL;
  
  	/*
  	 * Miscellaneous initialization
*************** ExecInitSort(Sort *node, EState *estate,
*** 209,214 ****
--- 446,455 ----
  	ExecAssignScanTypeFromOuterPlan(&sortstate->ss);
  	sortstate->ss.ps.ps_ProjInfo = NULL;
  
+ 	/* make standalone slot to store previous tuple from outer node */
+ 	sortstate->prevSlot = MakeSingleTupleTableSlot(
+ 								ExecGetResultType(outerPlanState(sortstate)));
+ 
  	SO1_printf("ExecInitSort: %s\n",
  			   "sort node initialized");
  
*************** ExecEndSort(SortState *node)
*** 231,236 ****
--- 472,479 ----
  	ExecClearTuple(node->ss.ss_ScanTupleSlot);
  	/* must drop pointer to sort result tuple */
  	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+ 	/* must drop stanalone tuple slot from outer node */
+ 	ExecDropSingleTupleTableSlot(node->prevSlot);
  
  	/*
  	 * Release tuplesort resources
*************** ExecReScanSort(SortState *node)
*** 318,323 ****
--- 561,567 ----
  		node->sort_Done = false;
  		tuplesort_end((Tuplesortstate *) node->tuplesortstate);
  		node->tuplesortstate = NULL;
+ 		node->bound_Done = 0;
  
  		/*
  		 * if chgParam of subnode is not null then plan will be re-scanned by
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
new file mode 100644
index 05d8538..8c47f44
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
*************** _copyMaterial(const Material *from)
*** 837,842 ****
--- 837,860 ----
  
  
  /*
+  * CopySortFields
+  *
+  *		This function copies the fields of the Sort node.  It is used by
+  *		all the copy functions for classes which inherit from Sort.
+  */
+ static void
+ CopySortFields(const Sort *from, Sort *newnode)
+ {
+ 	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+ 
+ 	COPY_SCALAR_FIELD(numCols);
+ 	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+ 	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+ 	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+ 	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+ }
+ 
+ /*
   * _copySort
   */
  static Sort *
*************** _copySort(const Sort *from)
*** 847,859 ****
  	/*
  	 * copy node superclass fields
  	 */
! 	CopyPlanFields((const Plan *) from, (Plan *) newnode);
  
! 	COPY_SCALAR_FIELD(numCols);
! 	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
! 	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
! 	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
! 	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
  
  	return newnode;
  }
--- 865,893 ----
  	/*
  	 * copy node superclass fields
  	 */
! 	CopySortFields(from, newnode);
  
! 	return newnode;
! }
! 
! 
! /*
!  * _copyIncSort
!  */
! static IncSort *
! _copyIncSort(const IncSort *from)
! {
! 	IncSort	   *newnode = makeNode(IncSort);
! 
! 	/*
! 	 * copy node superclass fields
! 	 */
! 	CopySortFields((const Sort *) from, (Sort *) newnode);
! 
! 	/*
! 	 * copy remainder of node
! 	 */
! 	COPY_SCALAR_FIELD(skipCols);
  
  	return newnode;
  }
*************** copyObject(const void *from)
*** 4583,4588 ****
--- 4617,4625 ----
  		case T_Sort:
  			retval = _copySort(from);
  			break;
+ 		case T_IncSort:
+ 			retval = _copyIncSort(from);
+ 			break;
  		case T_Group:
  			retval = _copyGroup(from);
  			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
new file mode 100644
index b3802b4..7522cc3
*** a/src/backend/nodes/outfuncs.c
--- b/src/backend/nodes/outfuncs.c
*************** _outMaterial(StringInfo str, const Mater
*** 781,792 ****
  }
  
  static void
! _outSort(StringInfo str, const Sort *node)
  {
  	int			i;
  
- 	WRITE_NODE_TYPE("SORT");
- 
  	_outPlanInfo(str, (const Plan *) node);
  
  	WRITE_INT_FIELD(numCols);
--- 781,790 ----
  }
  
  static void
! _outSortInfo(StringInfo str, const Sort *node)
  {
  	int			i;
  
  	_outPlanInfo(str, (const Plan *) node);
  
  	WRITE_INT_FIELD(numCols);
*************** _outSort(StringInfo str, const Sort *nod
*** 809,814 ****
--- 807,830 ----
  }
  
  static void
+ _outSort(StringInfo str, const Sort *node)
+ {
+ 	WRITE_NODE_TYPE("SORT");
+ 
+ 	_outSortInfo(str, node);
+ }
+ 
+ static void
+ _outIncSort(StringInfo str, const IncSort *node)
+ {
+ 	WRITE_NODE_TYPE("INCSORT");
+ 
+ 	_outSortInfo(str, (const Sort *) node);
+ 
+ 	WRITE_INT_FIELD(skipCols);
+ }
+ 
+ static void
  _outUnique(StringInfo str, const Unique *node)
  {
  	int			i;
*************** outNode(StringInfo str, const void *obj)
*** 3482,3487 ****
--- 3498,3506 ----
  			case T_Sort:
  				_outSort(str, obj);
  				break;
+ 			case T_IncSort:
+ 				_outIncSort(str, obj);
+ 				break;
  			case T_Unique:
  				_outUnique(str, obj);
  				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
new file mode 100644
index d2f69fe..0dcb86e
*** a/src/backend/nodes/readfuncs.c
--- b/src/backend/nodes/readfuncs.c
*************** _readMaterial(void)
*** 1978,1989 ****
  }
  
  /*
!  * _readSort
   */
! static Sort *
! _readSort(void)
  {
! 	READ_LOCALS(Sort);
  
  	ReadCommonPlan(&local_node->plan);
  
--- 1978,1990 ----
  }
  
  /*
!  * ReadCommonSort
!  *	Assign the basic stuff of all nodes that inherit from Sort
   */
! static void
! ReadCommonSort(Sort *local_node)
  {
! 	READ_TEMP_LOCALS();
  
  	ReadCommonPlan(&local_node->plan);
  
*************** _readSort(void)
*** 1992,1997 ****
--- 1993,2024 ----
  	READ_OID_ARRAY(sortOperators, local_node->numCols);
  	READ_OID_ARRAY(collations, local_node->numCols);
  	READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+ }
+ 
+ /*
+  * _readSort
+  */
+ static Sort *
+ _readSort(void)
+ {
+ 	READ_LOCALS_NO_FIELDS(Sort);
+ 
+ 	ReadCommonSort(local_node);
+ 
+ 	READ_DONE();
+ }
+ 
+ /*
+  * _readIncSort
+  */
+ static IncSort *
+ _readIncSort(void)
+ {
+ 	READ_LOCALS(IncSort);
+ 
+ 	ReadCommonSort(&local_node->sort);
+ 
+ 	READ_INT_FIELD(skipCols);
  
  	READ_DONE();
  }
*************** parseNodeString(void)
*** 2520,2525 ****
--- 2547,2554 ----
  		return_value = _readMaterial();
  	else if (MATCH("SORT", 4))
  		return_value = _readSort();
+ 	else if (MATCH("INCSORT", 7))
+ 		return_value = _readIncSort();
  	else if (MATCH("GROUP", 5))
  		return_value = _readGroup();
  	else if (MATCH("AGG", 3))
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
new file mode 100644
index eeacf81..22f81aa
*** a/src/backend/optimizer/path/allpaths.c
--- b/src/backend/optimizer/path/allpaths.c
*************** print_path(PlannerInfo *root, Path *path
*** 3097,3102 ****
--- 3097,3106 ----
  			ptype = "Sort";
  			subpath = ((SortPath *) path)->subpath;
  			break;
+ 		case T_IncSortPath:
+ 			ptype = "IncSort";
+ 			subpath = ((SortPath *) path)->subpath;
+ 			break;
  		case T_GroupPath:
  			ptype = "Group";
  			subpath = ((GroupPath *) path)->subpath;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
new file mode 100644
index d01630f..b98abc7
*** a/src/backend/optimizer/path/costsize.c
--- b/src/backend/optimizer/path/costsize.c
*************** bool		enable_indexonlyscan = true;
*** 121,126 ****
--- 121,127 ----
  bool		enable_bitmapscan = true;
  bool		enable_tidscan = true;
  bool		enable_sort = true;
+ bool		enable_incsort = true;
  bool		enable_hashagg = true;
  bool		enable_nestloop = true;
  bool		enable_material = true;
*************** cost_recursive_union(Path *runion, Path 
*** 1419,1424 ****
--- 1420,1432 ----
   *	  Determines and returns the cost of sorting a relation, including
   *	  the cost of reading the input data.
   *
+  * Sort could be either full sort of relation or incremental sort when we already
+  * have data presorted by some of required pathkeys.  In the second case
+  * we estimate number of groups which source data is divided to by presorted
+  * pathkeys.  And then estimate cost of sorting each individual group assuming
+  * data is divided into group uniformly.  Also, if LIMIT is specified then
+  * we have to pull from source and sort only some of total groups.
+  *
   * If the total volume of data to sort is less than sort_mem, we will do
   * an in-memory sort, which requires no I/O and about t*log2(t) tuple
   * comparisons for t tuples.
*************** cost_recursive_union(Path *runion, Path 
*** 1445,1451 ****
   * work that has to be done to prepare the inputs to the comparison operators.
   *
   * 'pathkeys' is a list of sort keys
!  * 'input_cost' is the total cost for reading the input data
   * 'tuples' is the number of tuples in the relation
   * 'width' is the average tuple width in bytes
   * 'comparison_cost' is the extra cost per comparison, if any
--- 1453,1460 ----
   * work that has to be done to prepare the inputs to the comparison operators.
   *
   * 'pathkeys' is a list of sort keys
!  * 'input_startup_cost' is the startup cost for reading the input data
!  * 'input_total_cost' is the total cost for reading the input data
   * 'tuples' is the number of tuples in the relation
   * 'width' is the average tuple width in bytes
   * 'comparison_cost' is the extra cost per comparison, if any
*************** cost_recursive_union(Path *runion, Path 
*** 1461,1479 ****
   */
  void
  cost_sort(Path *path, PlannerInfo *root,
! 		  List *pathkeys, Cost input_cost, double tuples, int width,
! 		  Cost comparison_cost, int sort_mem,
  		  double limit_tuples)
  {
! 	Cost		startup_cost = input_cost;
! 	Cost		run_cost = 0;
  	double		input_bytes = relation_byte_size(tuples, width);
  	double		output_bytes;
  	double		output_tuples;
  	long		sort_mem_bytes = sort_mem * 1024L;
  
  	if (!enable_sort)
  		startup_cost += disable_cost;
  
  	path->rows = tuples;
  
--- 1470,1497 ----
   */
  void
  cost_sort(Path *path, PlannerInfo *root,
! 		  List *pathkeys, int presorted_keys,
! 		  Cost input_startup_cost, Cost input_total_cost,
! 		  double tuples, int width, Cost comparison_cost, int sort_mem,
  		  double limit_tuples)
  {
! 	Cost		startup_cost = input_startup_cost;
! 	Cost		run_cost = 0,
! 				rest_cost,
! 				group_cost,
! 				input_run_cost = input_total_cost - input_startup_cost;
  	double		input_bytes = relation_byte_size(tuples, width);
  	double		output_bytes;
  	double		output_tuples;
+ 	double		num_groups,
+ 				group_input_bytes,
+ 				group_tuples;
  	long		sort_mem_bytes = sort_mem * 1024L;
  
  	if (!enable_sort)
  		startup_cost += disable_cost;
+ 	if (!enable_incsort)
+ 		presorted_keys = false;
  
  	path->rows = tuples;
  
*************** cost_sort(Path *path, PlannerInfo *root,
*** 1499,1511 ****
  		output_bytes = input_bytes;
  	}
  
! 	if (output_bytes > sort_mem_bytes)
  	{
  		/*
  		 * We'll have to use a disk-based sort of all the tuples
  		 */
! 		double		npages = ceil(input_bytes / BLCKSZ);
! 		double		nruns = input_bytes / sort_mem_bytes;
  		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
  		double		log_runs;
  		double		npageaccesses;
--- 1517,1566 ----
  		output_bytes = input_bytes;
  	}
  
! 	/*
! 	 * Estimate number of groups which dataset is divided by presorted keys.
! 	 */
! 	if (presorted_keys > 0)
! 	{
! 		List	   *presortedExprs = NIL;
! 		ListCell   *l;
! 		int			i = 0;
! 
! 		/* Extract presorted keys as list of expressions */
! 		foreach(l, pathkeys)
! 		{
! 			PathKey *key = (PathKey *)lfirst(l);
! 			EquivalenceMember *member = (EquivalenceMember *)
! 								lfirst(list_head(key->pk_eclass->ec_members));
! 
! 			presortedExprs = lappend(presortedExprs, member->em_expr);
! 
! 			i++;
! 			if (i >= presorted_keys)
! 				break;
! 		}
! 
! 		/* Estimate number of groups with equal presorted keys */
! 		num_groups = estimate_num_groups(root, presortedExprs, tuples, NULL);
! 	}
! 	else
! 	{
! 		num_groups = 1.0;
! 	}
! 
! 	/*
! 	 * Estimate average cost of sorting of one group where presorted keys are
! 	 * equal.
! 	 */
! 	group_input_bytes = input_bytes / num_groups;
! 	group_tuples = tuples / num_groups;
! 	if (output_bytes > sort_mem_bytes && group_input_bytes > sort_mem_bytes)
  	{
  		/*
  		 * We'll have to use a disk-based sort of all the tuples
  		 */
! 		double		npages = ceil(group_input_bytes / BLCKSZ);
! 		double		nruns = group_input_bytes / sort_mem_bytes;
  		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
  		double		log_runs;
  		double		npageaccesses;
*************** cost_sort(Path *path, PlannerInfo *root,
*** 1515,1521 ****
  		 *
  		 * Assume about N log2 N comparisons
  		 */
! 		startup_cost += comparison_cost * tuples * LOG2(tuples);
  
  		/* Disk costs */
  
--- 1570,1576 ----
  		 *
  		 * Assume about N log2 N comparisons
  		 */
! 		group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
  
  		/* Disk costs */
  
*************** cost_sort(Path *path, PlannerInfo *root,
*** 1526,1535 ****
  			log_runs = 1.0;
  		npageaccesses = 2.0 * npages * log_runs;
  		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
! 		startup_cost += npageaccesses *
  			(seq_page_cost * 0.75 + random_page_cost * 0.25);
  	}
! 	else if (tuples > 2 * output_tuples || input_bytes > sort_mem_bytes)
  	{
  		/*
  		 * We'll use a bounded heap-sort keeping just K tuples in memory, for
--- 1581,1590 ----
  			log_runs = 1.0;
  		npageaccesses = 2.0 * npages * log_runs;
  		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
! 		group_cost += npageaccesses *
  			(seq_page_cost * 0.75 + random_page_cost * 0.25);
  	}
! 	else if (group_tuples > 2 * output_tuples || group_input_bytes > sort_mem_bytes)
  	{
  		/*
  		 * We'll use a bounded heap-sort keeping just K tuples in memory, for
*************** cost_sort(Path *path, PlannerInfo *root,
*** 1537,1550 ****
  		 * factor is a bit higher than for quicksort.  Tweak it so that the
  		 * cost curve is continuous at the crossover point.
  		 */
! 		startup_cost += comparison_cost * tuples * LOG2(2.0 * output_tuples);
  	}
  	else
  	{
  		/* We'll use plain quicksort on all the input tuples */
! 		startup_cost += comparison_cost * tuples * LOG2(tuples);
  	}
  
  	/*
  	 * Also charge a small amount (arbitrarily set equal to operator cost) per
  	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
--- 1592,1617 ----
  		 * factor is a bit higher than for quicksort.  Tweak it so that the
  		 * cost curve is continuous at the crossover point.
  		 */
! 		group_cost = comparison_cost * group_tuples * LOG2(2.0 * output_tuples);
  	}
  	else
  	{
  		/* We'll use plain quicksort on all the input tuples */
! 		group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
  	}
  
+ 	/* Add per group cost of fetching tuples from input */
+ 	group_cost += input_run_cost / num_groups;
+ 
+ 	/*
+ 	 * We've to sort first group to start output from node. Sorting rest of
+ 	 * groups are required to return all the other tuples.
+ 	 */
+ 	startup_cost += group_cost;
+ 	rest_cost = (num_groups * (output_tuples / tuples) - 1.0) * group_cost;
+ 	if (rest_cost > 0.0)
+ 		run_cost += rest_cost;
+ 
  	/*
  	 * Also charge a small amount (arbitrarily set equal to operator cost) per
  	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
*************** initial_cost_mergejoin(PlannerInfo *root
*** 2300,2305 ****
--- 2367,2374 ----
  		cost_sort(&sort_path,
  				  root,
  				  outersortkeys,
+ 				  pathkeys_common(outer_path->pathkeys, outersortkeys),
+ 				  outer_path->startup_cost,
  				  outer_path->total_cost,
  				  outer_path_rows,
  				  outer_path->pathtarget->width,
*************** initial_cost_mergejoin(PlannerInfo *root
*** 2326,2331 ****
--- 2395,2402 ----
  		cost_sort(&sort_path,
  				  root,
  				  innersortkeys,
+ 				  pathkeys_common(inner_path->pathkeys, innersortkeys),
+ 				  inner_path->startup_cost,
  				  inner_path->total_cost,
  				  inner_path_rows,
  				  inner_path->pathtarget->width,
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
new file mode 100644
index 1065b31..653e4e9
*** a/src/backend/optimizer/path/pathkeys.c
--- b/src/backend/optimizer/path/pathkeys.c
***************
*** 22,31 ****
--- 22,33 ----
  #include "nodes/nodeFuncs.h"
  #include "nodes/plannodes.h"
  #include "optimizer/clauses.h"
+ #include "optimizer/cost.h"
  #include "optimizer/pathnode.h"
  #include "optimizer/paths.h"
  #include "optimizer/tlist.h"
  #include "utils/lsyscache.h"
+ #include "utils/selfuncs.h"
  
  
  static bool pathkey_is_redundant(PathKey *new_pathkey, List *pathkeys);
*************** compare_pathkeys(List *keys1, List *keys
*** 308,313 ****
--- 310,342 ----
  	return PATHKEYS_EQUAL;
  }
  
+ 
+ /*
+  * pathkeys_common
+  *    Returns length of longest common prefix of keys1 and keys2.
+  */
+ int
+ pathkeys_common(List *keys1, List *keys2)
+ {
+ 	int n;
+ 	ListCell   *key1,
+ 			   *key2;
+ 	n = 0;
+ 
+ 	forboth(key1, keys1, key2, keys2)
+ 	{
+ 		PathKey    *pathkey1 = (PathKey *) lfirst(key1);
+ 		PathKey    *pathkey2 = (PathKey *) lfirst(key2);
+ 
+ 		if (pathkey1 != pathkey2)
+ 			return n;
+ 		n++;
+ 	}
+ 
+ 	return n;
+ }
+ 
+ 
  /*
   * pathkeys_contained_in
   *	  Common special case of compare_pathkeys: we just want to know
*************** get_cheapest_path_for_pathkeys(List *pat
*** 368,375 ****
  /*
   * get_cheapest_fractional_path_for_pathkeys
   *	  Find the cheapest path (for retrieving a specified fraction of all
!  *	  the tuples) that satisfies the given pathkeys and parameterization.
!  *	  Return NULL if no such path.
   *
   * See compare_fractional_path_costs() for the interpretation of the fraction
   * parameter.
--- 397,408 ----
  /*
   * get_cheapest_fractional_path_for_pathkeys
   *	  Find the cheapest path (for retrieving a specified fraction of all
!  *	  the tuples) that satisfies given parameterization and at least partially
!  *	  satisfies the given pathkeys.  Return NULL if no path found.
!  *	  If pathkeys are satisfied only partially then we would have to do
!  *	  incremental sort in order to satisfy pathkeys completely.  Since
!  *	  incremental sort consumes data by presorted groups, we would have to
!  *	  consume more data than in the case of fully presorted path.
   *
   * See compare_fractional_path_costs() for the interpretation of the fraction
   * parameter.
*************** right_merge_direction(PlannerInfo *root,
*** 1461,1486 ****
   *		Count the number of pathkeys that are useful for meeting the
   *		query's requested output ordering.
   *
!  * Unlike merge pathkeys, this is an all-or-nothing affair: it does us
!  * no good to order by just the first key(s) of the requested ordering.
!  * So the result is always either 0 or list_length(root->query_pathkeys).
   */
! static int
! pathkeys_useful_for_ordering(PlannerInfo *root, List *pathkeys)
  {
! 	if (root->query_pathkeys == NIL)
  		return 0;				/* no special ordering requested */
  
  	if (pathkeys == NIL)
  		return 0;				/* unordered path */
  
! 	if (pathkeys_contained_in(root->query_pathkeys, pathkeys))
  	{
! 		/* It's useful ... or at least the first N keys are */
! 		return list_length(root->query_pathkeys);
  	}
- 
- 	return 0;					/* path ordering not useful */
  }
  
  /*
--- 1494,1535 ----
   *		Count the number of pathkeys that are useful for meeting the
   *		query's requested output ordering.
   *
!  * Returns number of pathkeys that maches given argument. Others can be
!  * satisfied by incremental sort.
   */
! int
! pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys)
  {
! 	int	n_common_pathkeys;
! 
! 	if (query_pathkeys == NIL)
  		return 0;				/* no special ordering requested */
  
  	if (pathkeys == NIL)
  		return 0;				/* unordered path */
  
! 	n_common_pathkeys = pathkeys_common(query_pathkeys, pathkeys);
! 
! 	if (enable_incsort)
  	{
! 		/*
! 		 * Return the number of path keys in common, or 0 if there are none. Any
! 		 * first common pathkeys could be useful for ordering because we can use
! 		 * incremental sort.
! 		 */
! 		return n_common_pathkeys;
! 	}
! 	else
! 	{
! 		/* 
! 		 * When incremental sort is disabled, pathkeys are useful only when they
! 		 * do contain all the query pathkeys.
! 		 */
! 		if (n_common_pathkeys == list_length(query_pathkeys))
! 			return n_common_pathkeys;
! 		else
! 			return 0;
  	}
  }
  
  /*
*************** truncate_useless_pathkeys(PlannerInfo *r
*** 1496,1502 ****
  	int			nuseful2;
  
  	nuseful = pathkeys_useful_for_merging(root, rel, pathkeys);
! 	nuseful2 = pathkeys_useful_for_ordering(root, pathkeys);
  	if (nuseful2 > nuseful)
  		nuseful = nuseful2;
  
--- 1545,1551 ----
  	int			nuseful2;
  
  	nuseful = pathkeys_useful_for_merging(root, rel, pathkeys);
! 	nuseful2 = pathkeys_useful_for_ordering(root->query_pathkeys, pathkeys);
  	if (nuseful2 > nuseful)
  		nuseful = nuseful2;
  
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
new file mode 100644
index 997bdcf..f103b04
*** a/src/backend/optimizer/plan/createplan.c
--- b/src/backend/optimizer/plan/createplan.c
*************** static MergeJoin *make_mergejoin(List *t
*** 227,233 ****
  			   bool *mergenullsfirst,
  			   Plan *lefttree, Plan *righttree,
  			   JoinType jointype);
! static Sort *make_sort(Plan *lefttree, int numCols,
  		  AttrNumber *sortColIdx, Oid *sortOperators,
  		  Oid *collations, bool *nullsFirst);
  static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
--- 227,233 ----
  			   bool *mergenullsfirst,
  			   Plan *lefttree, Plan *righttree,
  			   JoinType jointype);
! static Sort *make_sort(Plan *lefttree, int numCols, int skipCols,
  		  AttrNumber *sortColIdx, Oid *sortOperators,
  		  Oid *collations, bool *nullsFirst);
  static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
*************** static Plan *prepare_sort_from_pathkeys(
*** 242,251 ****
  static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
  					   TargetEntry *tle,
  					   Relids relids);
! static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys);
  static Sort *make_sort_from_groupcols(List *groupcls,
  						 AttrNumber *grpColIdx,
! 						 Plan *lefttree);
  static Material *make_material(Plan *lefttree);
  static WindowAgg *make_windowagg(List *tlist, Index winref,
  			   int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
--- 242,253 ----
  static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
  					   TargetEntry *tle,
  					   Relids relids);
! static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
! 						 int skipCols);
  static Sort *make_sort_from_groupcols(List *groupcls,
  						 AttrNumber *grpColIdx,
! 						 Plan *lefttree,
! 						 int skipCols);
  static Material *make_material(Plan *lefttree);
  static WindowAgg *make_windowagg(List *tlist, Index winref,
  			   int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
*************** create_plan_recurse(PlannerInfo *root, P
*** 423,428 ****
--- 425,431 ----
  											   (GatherPath *) best_path);
  			break;
  		case T_Sort:
+ 		case T_IncSort:
  			plan = (Plan *) create_sort_plan(root,
  											 (SortPath *) best_path,
  											 flags);
*************** create_merge_append_plan(PlannerInfo *ro
*** 1068,1073 ****
--- 1071,1077 ----
  		Oid		   *sortOperators;
  		Oid		   *collations;
  		bool	   *nullsFirst;
+ 		int			n_common_pathkeys;
  
  		/* Build the child plan */
  		/* Must insist that all children return the same tlist */
*************** create_merge_append_plan(PlannerInfo *ro
*** 1102,1110 ****
  					  numsortkeys * sizeof(bool)) == 0);
  
  		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
! 		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
  		{
  			Sort	   *sort = make_sort(subplan, numsortkeys,
  										 sortColIdx, sortOperators,
  										 collations, nullsFirst);
  
--- 1106,1116 ----
  					  numsortkeys * sizeof(bool)) == 0);
  
  		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
! 		n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
! 		if (n_common_pathkeys < list_length(pathkeys))
  		{
  			Sort	   *sort = make_sort(subplan, numsortkeys,
+ 										 n_common_pathkeys,
  										 sortColIdx, sortOperators,
  										 collations, nullsFirst);
  
*************** create_sort_plan(PlannerInfo *root, Sort
*** 1535,1540 ****
--- 1541,1547 ----
  {
  	Sort	   *plan;
  	Plan	   *subplan;
+ 	int			n_common_pathkeys;
  
  	/*
  	 * We don't want any excess columns in the sorted tuples, so request a
*************** create_sort_plan(PlannerInfo *root, Sort
*** 1544,1550 ****
  	subplan = create_plan_recurse(root, best_path->subpath,
  								  flags | CP_SMALL_TLIST);
  
! 	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys);
  
  	copy_generic_path_info(&plan->plan, (Path *) best_path);
  
--- 1551,1561 ----
  	subplan = create_plan_recurse(root, best_path->subpath,
  								  flags | CP_SMALL_TLIST);
  
! 	n_common_pathkeys = pathkeys_common(best_path->path.pathkeys,
! 										best_path->subpath->pathkeys);
! 
! 	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys,
! 								   n_common_pathkeys);
  
  	copy_generic_path_info(&plan->plan, (Path *) best_path);
  
*************** create_groupingsets_plan(PlannerInfo *ro
*** 1790,1796 ****
  			sort_plan = (Plan *)
  				make_sort_from_groupcols(groupClause,
  										 new_grpColIdx,
! 										 subplan);
  
  			agg_plan = (Plan *) make_agg(NIL,
  										 NIL,
--- 1801,1808 ----
  			sort_plan = (Plan *)
  				make_sort_from_groupcols(groupClause,
  										 new_grpColIdx,
! 										 subplan,
! 										 0);
  
  			agg_plan = (Plan *) make_agg(NIL,
  										 NIL,
*************** create_mergejoin_plan(PlannerInfo *root,
*** 3624,3631 ****
  	 */
  	if (best_path->outersortkeys)
  	{
! 		Sort	   *sort = make_sort_from_pathkeys(outer_plan,
! 												   best_path->outersortkeys);
  
  		label_sort_with_costsize(root, sort, -1.0);
  		outer_plan = (Plan *) sort;
--- 3636,3649 ----
  	 */
  	if (best_path->outersortkeys)
  	{
! 		Sort	   *sort;
! 		int			n_common_pathkeys;
! 
! 		n_common_pathkeys = pathkeys_common(best_path->outersortkeys,
! 									best_path->jpath.outerjoinpath->pathkeys);
! 
! 		sort = make_sort_from_pathkeys(outer_plan, best_path->outersortkeys,
! 									   n_common_pathkeys);
  
  		label_sort_with_costsize(root, sort, -1.0);
  		outer_plan = (Plan *) sort;
*************** create_mergejoin_plan(PlannerInfo *root,
*** 3636,3643 ****
  
  	if (best_path->innersortkeys)
  	{
! 		Sort	   *sort = make_sort_from_pathkeys(inner_plan,
! 												   best_path->innersortkeys);
  
  		label_sort_with_costsize(root, sort, -1.0);
  		inner_plan = (Plan *) sort;
--- 3654,3667 ----
  
  	if (best_path->innersortkeys)
  	{
! 		Sort	   *sort;
! 		int			n_common_pathkeys;
! 
! 		n_common_pathkeys = pathkeys_common(best_path->innersortkeys,
! 									best_path->jpath.innerjoinpath->pathkeys);
! 
! 		sort = make_sort_from_pathkeys(inner_plan, best_path->innersortkeys,
! 									   n_common_pathkeys);
  
  		label_sort_with_costsize(root, sort, -1.0);
  		inner_plan = (Plan *) sort;
*************** label_sort_with_costsize(PlannerInfo *ro
*** 4692,4698 ****
  	Plan	   *lefttree = plan->plan.lefttree;
  	Path		sort_path;		/* dummy for result of cost_sort */
  
! 	cost_sort(&sort_path, root, NIL,
  			  lefttree->total_cost,
  			  lefttree->plan_rows,
  			  lefttree->plan_width,
--- 4716,4723 ----
  	Plan	   *lefttree = plan->plan.lefttree;
  	Path		sort_path;		/* dummy for result of cost_sort */
  
! 	cost_sort(&sort_path, root, NIL, 0,
! 			  lefttree->startup_cost,
  			  lefttree->total_cost,
  			  lefttree->plan_rows,
  			  lefttree->plan_width,
*************** make_mergejoin(List *tlist,
*** 5214,5226 ****
   * nullsFirst arrays already.
   */
  static Sort *
! make_sort(Plan *lefttree, int numCols,
  		  AttrNumber *sortColIdx, Oid *sortOperators,
  		  Oid *collations, bool *nullsFirst)
  {
! 	Sort	   *node = makeNode(Sort);
! 	Plan	   *plan = &node->plan;
  
  	plan->targetlist = lefttree->targetlist;
  	plan->qual = NIL;
  	plan->lefttree = lefttree;
--- 5239,5269 ----
   * nullsFirst arrays already.
   */
  static Sort *
! make_sort(Plan *lefttree, int numCols, int skipCols,
  		  AttrNumber *sortColIdx, Oid *sortOperators,
  		  Oid *collations, bool *nullsFirst)
  {
! 	Sort	   *node;
! 	Plan	   *plan;
  
+ 	/* Always use regular sort node when enable_incsort = false */
+ 	if (!enable_incsort)
+ 		skipCols = 0;
+ 
+ 	if (skipCols == 0)
+ 	{
+ 		node = makeNode(Sort);
+ 	}
+ 	else
+ 	{
+ 		IncSort    *incSort;
+ 
+ 		incSort = makeNode(IncSort);
+ 		node = &incSort->sort;
+ 		incSort->skipCols = skipCols;
+ 	}
+ 
+ 	plan = &node->plan;
  	plan->targetlist = lefttree->targetlist;
  	plan->qual = NIL;
  	plan->lefttree = lefttree;
*************** find_ec_member_for_tle(EquivalenceClass 
*** 5552,5558 ****
   *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
   */
  static Sort *
! make_sort_from_pathkeys(Plan *lefttree, List *pathkeys)
  {
  	int			numsortkeys;
  	AttrNumber *sortColIdx;
--- 5595,5601 ----
   *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
   */
  static Sort *
! make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, int skipCols)
  {
  	int			numsortkeys;
  	AttrNumber *sortColIdx;
*************** make_sort_from_pathkeys(Plan *lefttree, 
*** 5572,5578 ****
  										  &nullsFirst);
  
  	/* Now build the Sort node */
! 	return make_sort(lefttree, numsortkeys,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
--- 5615,5621 ----
  										  &nullsFirst);
  
  	/* Now build the Sort node */
! 	return make_sort(lefttree, numsortkeys, skipCols,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
*************** make_sort_from_sortclauses(List *sortcls
*** 5615,5621 ****
  		numsortkeys++;
  	}
  
! 	return make_sort(lefttree, numsortkeys,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
--- 5658,5664 ----
  		numsortkeys++;
  	}
  
! 	return make_sort(lefttree, numsortkeys, 0,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
*************** make_sort_from_sortclauses(List *sortcls
*** 5636,5642 ****
  static Sort *
  make_sort_from_groupcols(List *groupcls,
  						 AttrNumber *grpColIdx,
! 						 Plan *lefttree)
  {
  	List	   *sub_tlist = lefttree->targetlist;
  	ListCell   *l;
--- 5679,5686 ----
  static Sort *
  make_sort_from_groupcols(List *groupcls,
  						 AttrNumber *grpColIdx,
! 						 Plan *lefttree,
! 						 int skipCols)
  {
  	List	   *sub_tlist = lefttree->targetlist;
  	ListCell   *l;
*************** make_sort_from_groupcols(List *groupcls,
*** 5669,5675 ****
  		numsortkeys++;
  	}
  
! 	return make_sort(lefttree, numsortkeys,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
--- 5713,5719 ----
  		numsortkeys++;
  	}
  
! 	return make_sort(lefttree, numsortkeys, skipCols,
  					 sortColIdx, sortOperators,
  					 collations, nullsFirst);
  }
*************** is_projection_capable_plan(Plan *plan)
*** 6317,6322 ****
--- 6361,6367 ----
  		case T_Hash:
  		case T_Material:
  		case T_Sort:
+ 		case T_IncSort:
  		case T_Unique:
  		case T_SetOp:
  		case T_LockRows:
diff --git a/src/backend/optimizer/plan/planagg.c b/src/backend/optimizer/plan/planagg.c
new file mode 100644
index c3fbf3c..5fe1235
*** a/src/backend/optimizer/plan/planagg.c
--- b/src/backend/optimizer/plan/planagg.c
***************
*** 44,49 ****
--- 44,50 ----
  #include "parser/parse_clause.h"
  #include "rewrite/rewriteManip.h"
  #include "utils/lsyscache.h"
+ #include "utils/selfuncs.h"
  #include "utils/syscache.h"
  
  
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
new file mode 100644
index 3d33d46..557f885
*** a/src/backend/optimizer/plan/planner.c
--- b/src/backend/optimizer/plan/planner.c
*************** create_grouping_paths(PlannerInfo *root,
*** 3497,3510 ****
  			foreach(lc, input_rel->partial_pathlist)
  			{
  				Path	   *path = (Path *) lfirst(lc);
! 				bool		is_sorted;
  
! 				is_sorted = pathkeys_contained_in(root->group_pathkeys,
! 												  path->pathkeys);
! 				if (path == cheapest_partial_path || is_sorted)
  				{
  					/* Sort the cheapest partial path, if it isn't already */
! 					if (!is_sorted)
  						path = (Path *) create_sort_path(root,
  														 grouped_rel,
  														 path,
--- 3497,3510 ----
  			foreach(lc, input_rel->partial_pathlist)
  			{
  				Path	   *path = (Path *) lfirst(lc);
! 				int			n_useful_pathkeys;
  
! 				n_useful_pathkeys = pathkeys_useful_for_ordering(
! 										root->group_pathkeys, path->pathkeys);
! 				if (path == cheapest_partial_path || n_useful_pathkeys > 0)
  				{
  					/* Sort the cheapest partial path, if it isn't already */
! 					if (n_useful_pathkeys < list_length(root->group_pathkeys))
  						path = (Path *) create_sort_path(root,
  														 grouped_rel,
  														 path,
*************** create_grouping_paths(PlannerInfo *root,
*** 3577,3590 ****
  		foreach(lc, input_rel->pathlist)
  		{
  			Path	   *path = (Path *) lfirst(lc);
! 			bool		is_sorted;
  
! 			is_sorted = pathkeys_contained_in(root->group_pathkeys,
! 											  path->pathkeys);
! 			if (path == cheapest_path || is_sorted)
  			{
  				/* Sort the cheapest-total path if it isn't already sorted */
! 				if (!is_sorted)
  					path = (Path *) create_sort_path(root,
  													 grouped_rel,
  													 path,
--- 3577,3590 ----
  		foreach(lc, input_rel->pathlist)
  		{
  			Path	   *path = (Path *) lfirst(lc);
! 			int			n_useful_pathkeys;
  
! 			n_useful_pathkeys = pathkeys_useful_for_ordering(
! 										root->group_pathkeys, path->pathkeys);
! 			if (path == cheapest_path || n_useful_pathkeys > 0)
  			{
  				/* Sort the cheapest-total path if it isn't already sorted */
! 				if (n_useful_pathkeys < list_length(root->group_pathkeys))
  					path = (Path *) create_sort_path(root,
  													 grouped_rel,
  													 path,
*************** create_ordered_paths(PlannerInfo *root,
*** 4240,4252 ****
  	foreach(lc, input_rel->pathlist)
  	{
  		Path	   *path = (Path *) lfirst(lc);
! 		bool		is_sorted;
  
! 		is_sorted = pathkeys_contained_in(root->sort_pathkeys,
! 										  path->pathkeys);
! 		if (path == cheapest_input_path || is_sorted)
  		{
! 			if (!is_sorted)
  			{
  				/* An explicit sort here can take advantage of LIMIT */
  				path = (Path *) create_sort_path(root,
--- 4240,4252 ----
  	foreach(lc, input_rel->pathlist)
  	{
  		Path	   *path = (Path *) lfirst(lc);
! 		int			n_useful_pathkeys;
  
! 		n_useful_pathkeys = pathkeys_useful_for_ordering(root->sort_pathkeys,
! 														 path->pathkeys);
! 		if (path == cheapest_input_path || n_useful_pathkeys > 0)
  		{
! 			if (n_useful_pathkeys < list_length(root->sort_pathkeys))
  			{
  				/* An explicit sort here can take advantage of LIMIT */
  				path = (Path *) create_sort_path(root,
*************** plan_cluster_use_sort(Oid tableOid, Oid 
*** 5325,5332 ****
  
  	/* Estimate the cost of seq scan + sort */
  	seqScanPath = create_seqscan_path(root, rel, NULL, 0);
! 	cost_sort(&seqScanAndSortPath, root, NIL,
! 			  seqScanPath->total_cost, rel->tuples, rel->reltarget->width,
  			  comparisonCost, maintenance_work_mem, -1.0);
  
  	/* Estimate the cost of index scan */
--- 5325,5333 ----
  
  	/* Estimate the cost of seq scan + sort */
  	seqScanPath = create_seqscan_path(root, rel, NULL, 0);
! 	cost_sort(&seqScanAndSortPath, root, NIL, 0,
! 			  seqScanPath->startup_cost, seqScanPath->total_cost,
! 			  rel->tuples, rel->reltarget->width,
  			  comparisonCost, maintenance_work_mem, -1.0);
  
  	/* Estimate the cost of index scan */
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
new file mode 100644
index be267b9..7835cc4
*** a/src/backend/optimizer/plan/setrefs.c
--- b/src/backend/optimizer/plan/setrefs.c
*************** set_plan_refs(PlannerInfo *root, Plan *p
*** 610,615 ****
--- 610,616 ----
  		case T_Hash:
  		case T_Material:
  		case T_Sort:
+ 		case T_IncSort:
  		case T_Unique:
  		case T_SetOp:
  
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
new file mode 100644
index 7954c44..4df783e
*** a/src/backend/optimizer/plan/subselect.c
--- b/src/backend/optimizer/plan/subselect.c
*************** finalize_plan(PlannerInfo *root, Plan *p
*** 2693,2698 ****
--- 2693,2699 ----
  		case T_Hash:
  		case T_Material:
  		case T_Sort:
+ 		case T_IncSort:
  		case T_Unique:
  		case T_Gather:
  		case T_SetOp:
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
new file mode 100644
index 06e843d..f3b9717
*** a/src/backend/optimizer/prep/prepunion.c
--- b/src/backend/optimizer/prep/prepunion.c
*************** choose_hashed_setop(PlannerInfo *root, L
*** 963,969 ****
  	sorted_p.startup_cost = input_path->startup_cost;
  	sorted_p.total_cost = input_path->total_cost;
  	/* XXX cost_sort doesn't actually look at pathkeys, so just pass NIL */
! 	cost_sort(&sorted_p, root, NIL, sorted_p.total_cost,
  			  input_path->rows, input_path->pathtarget->width,
  			  0.0, work_mem, -1.0);
  	cost_group(&sorted_p, root, numGroupCols, dNumGroups,
--- 963,970 ----
  	sorted_p.startup_cost = input_path->startup_cost;
  	sorted_p.total_cost = input_path->total_cost;
  	/* XXX cost_sort doesn't actually look at pathkeys, so just pass NIL */
! 	cost_sort(&sorted_p, root, NIL, 0, 
! 			  sorted_p.startup_cost, sorted_p.total_cost,
  			  input_path->rows, input_path->pathtarget->width,
  			  0.0, work_mem, -1.0);
  	cost_group(&sorted_p, root, numGroupCols, dNumGroups,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
new file mode 100644
index 3248296..2777aca
*** a/src/backend/optimizer/util/pathnode.c
--- b/src/backend/optimizer/util/pathnode.c
*************** compare_path_costs(Path *path1, Path *pa
*** 95,101 ****
  }
  
  /*
!  * compare_path_fractional_costs
   *	  Return -1, 0, or +1 according as path1 is cheaper, the same cost,
   *	  or more expensive than path2 for fetching the specified fraction
   *	  of the total tuples.
--- 95,101 ----
  }
  
  /*
!  * compare_fractional_path_costs
   *	  Return -1, 0, or +1 according as path1 is cheaper, the same cost,
   *	  or more expensive than path2 for fetching the specified fraction
   *	  of the total tuples.
*************** create_merge_append_path(PlannerInfo *ro
*** 1293,1304 ****
  	foreach(l, subpaths)
  	{
  		Path	   *subpath = (Path *) lfirst(l);
  
  		pathnode->path.rows += subpath->rows;
  		pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
  			subpath->parallel_safe;
  
! 		if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
  		{
  			/* Subpath is adequately ordered, we won't need to sort it */
  			input_startup_cost += subpath->startup_cost;
--- 1293,1305 ----
  	foreach(l, subpaths)
  	{
  		Path	   *subpath = (Path *) lfirst(l);
+ 		int			n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
  
  		pathnode->path.rows += subpath->rows;
  		pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
  			subpath->parallel_safe;
  
! 		if (n_common_pathkeys == list_length(pathkeys))
  		{
  			/* Subpath is adequately ordered, we won't need to sort it */
  			input_startup_cost += subpath->startup_cost;
*************** create_merge_append_path(PlannerInfo *ro
*** 1312,1317 ****
--- 1313,1320 ----
  			cost_sort(&sort_path,
  					  root,
  					  pathkeys,
+ 					  n_common_pathkeys,
+ 					  subpath->startup_cost,
  					  subpath->total_cost,
  					  subpath->parent->tuples,
  					  subpath->pathtarget->width,
*************** create_unique_path(PlannerInfo *root, Re
*** 1548,1554 ****
  		/*
  		 * Estimate cost for sort+unique implementation
  		 */
! 		cost_sort(&sort_path, root, NIL,
  				  subpath->total_cost,
  				  rel->rows,
  				  subpath->pathtarget->width,
--- 1551,1558 ----
  		/*
  		 * Estimate cost for sort+unique implementation
  		 */
! 		cost_sort(&sort_path, root, NIL, 0,
! 				  subpath->startup_cost,
  				  subpath->total_cost,
  				  rel->rows,
  				  subpath->pathtarget->width,
*************** create_sort_path(PlannerInfo *root,
*** 2399,2407 ****
  				 List *pathkeys,
  				 double limit_tuples)
  {
! 	SortPath   *pathnode = makeNode(SortPath);
  
- 	pathnode->path.pathtype = T_Sort;
  	pathnode->path.parent = rel;
  	/* Sort doesn't project, so use source path's pathtarget */
  	pathnode->path.pathtarget = subpath->pathtarget;
--- 2403,2433 ----
  				 List *pathkeys,
  				 double limit_tuples)
  {
! 	SortPath   *pathnode;
! 	int			n_common_pathkeys;
! 
! 	if (enable_incsort)
! 		n_common_pathkeys = pathkeys_common(subpath->pathkeys, pathkeys);
! 	else
! 		n_common_pathkeys = 0;
! 
! 	if (n_common_pathkeys == 0)
! 	{
! 		pathnode = makeNode(SortPath);
! 		pathnode->path.pathtype = T_Sort;
! 	}
! 	else
! 	{
! 		IncSortPath   *incpathnode;
! 
! 		incpathnode = makeNode(IncSortPath);
! 		pathnode = &incpathnode->spath;
! 		pathnode->path.pathtype = T_IncSort;
! 		incpathnode->skipCols = n_common_pathkeys;
! 	}
! 
! 	Assert(n_common_pathkeys < list_length(pathkeys));
  
  	pathnode->path.parent = rel;
  	/* Sort doesn't project, so use source path's pathtarget */
  	pathnode->path.pathtarget = subpath->pathtarget;
*************** create_sort_path(PlannerInfo *root,
*** 2415,2421 ****
  
  	pathnode->subpath = subpath;
  
! 	cost_sort(&pathnode->path, root, pathkeys,
  			  subpath->total_cost,
  			  subpath->rows,
  			  subpath->pathtarget->width,
--- 2441,2449 ----
  
  	pathnode->subpath = subpath;
  
! 	cost_sort(&pathnode->path, root,
! 			  pathkeys, n_common_pathkeys,
! 			  subpath->startup_cost,
  			  subpath->total_cost,
  			  subpath->rows,
  			  subpath->pathtarget->width,
*************** create_groupingsets_path(PlannerInfo *ro
*** 2687,2693 ****
  				break;
  
  			/* Account for cost of sort, but don't charge input cost again */
! 			cost_sort(&sort_path, root, NIL,
  					  0.0,
  					  subpath->rows,
  					  subpath->pathtarget->width,
--- 2715,2722 ----
  				break;
  
  			/* Account for cost of sort, but don't charge input cost again */
! 			cost_sort(&sort_path, root, NIL, 0,
! 					  0.0,
  					  0.0,
  					  subpath->rows,
  					  subpath->pathtarget->width,
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
new file mode 100644
index f9f18f2..9607889
*** a/src/backend/utils/adt/orderedsetaggs.c
--- b/src/backend/utils/adt/orderedsetaggs.c
*************** ordered_set_startup(FunctionCallInfo fci
*** 276,282 ****
  												   qstate->sortOperators,
  												   qstate->sortCollations,
  												   qstate->sortNullsFirsts,
! 												   work_mem, false);
  	else
  		osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
  													qstate->sortOperator,
--- 276,282 ----
  												   qstate->sortOperators,
  												   qstate->sortCollations,
  												   qstate->sortNullsFirsts,
! 												   work_mem, false, false);
  	else
  		osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
  													qstate->sortOperator,
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
new file mode 100644
index d14f0f9..a8fd978
*** a/src/backend/utils/adt/selfuncs.c
--- b/src/backend/utils/adt/selfuncs.c
*************** estimate_num_groups(PlannerInfo *root, L
*** 3521,3526 ****
--- 3521,3562 ----
  }
  
  /*
+  * estimate_pathkeys_groups	- Estimate number of groups which dataset is
+  * 							  divided to by pathkeys.
+  *
+  * Returns an array of group numbers. i'th element of array is number of groups
+  * which first i pathkeys divides dataset into.  Actually is a convenience
+  * wrapper over estimate_num_groups().
+  */
+ double *
+ estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root, double tuples)
+ {
+ 	ListCell   *l;
+ 	List	   *groupExprs = NIL;
+ 	double	   *result;
+ 	int			i;
+ 
+ 	/*
+ 	 * Get number of groups for each prefix of pathkeys.
+ 	 */
+ 	i = 0;
+ 	result = (double *) palloc(sizeof(double) * list_length(pathkeys));
+ 	foreach(l, pathkeys)
+ 	{
+ 		PathKey *key = (PathKey *)lfirst(l);
+ 		EquivalenceMember *member = (EquivalenceMember *)
+ 							linitial(key->pk_eclass->ec_members);
+ 
+ 		groupExprs = lappend(groupExprs, member->em_expr);
+ 
+ 		result[i] = estimate_num_groups(root, groupExprs, tuples, NULL);
+ 		i++;
+ 	}
+ 
+ 	return result;
+ }
+ 
+ /*
   * Estimate hash bucketsize fraction (ie, number of entries in a bucket
   * divided by total tuples in relation) if the specified expression is used
   * as a hash key.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
new file mode 100644
index 5d8fb2e..46a2c16
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
*************** static struct config_bool ConfigureNames
*** 857,862 ****
--- 857,871 ----
  		NULL, NULL, NULL
  	},
  	{
+ 		{"enable_incsort", PGC_USERSET, QUERY_TUNING_METHOD,
+ 			gettext_noop("Enables the planner's use of incremental sort steps."),
+ 			NULL
+ 		},
+ 		&enable_incsort,
+ 		true,
+ 		NULL, NULL, NULL
+ 	},
+ 	{
  		{"enable_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
  			gettext_noop("Enables the planner's use of hashed aggregation plans."),
  			NULL
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
new file mode 100644
index e1e692d..af93ae4
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
*************** struct Tuplesortstate
*** 281,286 ****
--- 281,291 ----
  	int64		allowedMem;		/* total memory allowed, in bytes */
  	int			maxTapes;		/* number of tapes (Knuth's T) */
  	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+ 	TupSortStatus maxStatus;	/* maximum status reached between sort groups */
+ 	int64		maxMem;			/* maximum amount of memory used between
+ 								   sort groups */
+ 	bool		maxMemOnDisk;	/* is maxMem value for on-disk memory */
+ 	MemoryContext maincontext;
  	MemoryContext sortcontext;	/* memory context holding most sort data */
  	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
  	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
*************** static void writetup_datum(Tuplesortstat
*** 633,638 ****
--- 638,646 ----
  static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
  			  int tapenum, unsigned int len);
  static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+ static void tuplesort_free(Tuplesortstate *state, bool delete);
+ static void tuplesort_updatemax(Tuplesortstate *state);
+ 
  
  /*
   * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
*************** static Tuplesortstate *
*** 667,685 ****
  tuplesort_begin_common(int workMem, bool randomAccess)
  {
  	Tuplesortstate *state;
  	MemoryContext sortcontext;
  	MemoryContext tuplecontext;
  	MemoryContext oldcontext;
  
  	/*
! 	 * Create a working memory context for this sort operation. All data
! 	 * needed by the sort will live inside this context.
  	 */
! 	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
  										"TupleSort main",
  										ALLOCSET_DEFAULT_SIZES);
  
  	/*
  	 * Caller tuple (e.g. IndexTuple) memory context.
  	 *
  	 * A dedicated child context used exclusively for caller passed tuples
--- 675,704 ----
  tuplesort_begin_common(int workMem, bool randomAccess)
  {
  	Tuplesortstate *state;
+ 	MemoryContext maincontext;
  	MemoryContext sortcontext;
  	MemoryContext tuplecontext;
  	MemoryContext oldcontext;
  
  	/*
! 	 * Memory context surviving tuplesort_reset.  This memory context holds
! 	 * data which is useful to keep while sorting multiple similar batches.
  	 */
! 	maincontext = AllocSetContextCreate(CurrentMemoryContext,
  										"TupleSort main",
  										ALLOCSET_DEFAULT_SIZES);
  
  	/*
+ 	 * Create a working memory context for one sort operation.  The content of
+ 	 * this context is deleted by tuplesort_reset.
+ 	 */
+ 	sortcontext = AllocSetContextCreate(maincontext,
+ 										"TupleSort sort",
+ 										ALLOCSET_DEFAULT_MINSIZE,
+ 										ALLOCSET_DEFAULT_INITSIZE,
+ 										ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	/*
  	 * Caller tuple (e.g. IndexTuple) memory context.
  	 *
  	 * A dedicated child context used exclusively for caller passed tuples
*************** tuplesort_begin_common(int workMem, bool
*** 696,702 ****
  	 * Make the Tuplesortstate within the per-sort context.  This way, we
  	 * don't need a separate pfree() operation for it at shutdown.
  	 */
! 	oldcontext = MemoryContextSwitchTo(sortcontext);
  
  	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
  
--- 715,721 ----
  	 * Make the Tuplesortstate within the per-sort context.  This way, we
  	 * don't need a separate pfree() operation for it at shutdown.
  	 */
! 	oldcontext = MemoryContextSwitchTo(maincontext);
  
  	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
  
*************** tuplesort_begin_common(int workMem, bool
*** 714,719 ****
--- 733,739 ----
  	state->availMem = state->allowedMem;
  	state->sortcontext = sortcontext;
  	state->tuplecontext = tuplecontext;
+ 	state->maincontext = maincontext;
  	state->tapeset = NULL;
  
  	state->memtupcount = 0;
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 754,766 ****
  					 int nkeys, AttrNumber *attNums,
  					 Oid *sortOperators, Oid *sortCollations,
  					 bool *nullsFirstFlags,
! 					 int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
  	int			i;
  
! 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
  	AssertArg(nkeys > 0);
  
--- 774,787 ----
  					 int nkeys, AttrNumber *attNums,
  					 Oid *sortOperators, Oid *sortCollations,
  					 bool *nullsFirstFlags,
! 					 int workMem, bool randomAccess,
! 					 bool skipAbbrev)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
  	int			i;
  
! 	oldcontext = MemoryContextSwitchTo(state->maincontext);
  
  	AssertArg(nkeys > 0);
  
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 802,808 ****
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
  		/* Convey if abbreviation optimization is applicable in principle */
! 		sortKey->abbreviate = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
--- 823,829 ----
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
  		/* Convey if abbreviation optimization is applicable in principle */
! 		sortKey->abbreviate = (i == 0) && !skipAbbrev;
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
*************** tuplesort_begin_cluster(TupleDesc tupDes
*** 833,839 ****
  
  	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
  
! 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
--- 854,860 ----
  
  	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
  
! 	oldcontext = MemoryContextSwitchTo(state->maincontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
*************** tuplesort_begin_index_btree(Relation hea
*** 924,930 ****
  	MemoryContext oldcontext;
  	int			i;
  
! 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
--- 945,951 ----
  	MemoryContext oldcontext;
  	int			i;
  
! 	oldcontext = MemoryContextSwitchTo(state->maincontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
*************** tuplesort_begin_index_hash(Relation heap
*** 997,1003 ****
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
--- 1018,1024 ----
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(state->maincontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
*************** tuplesort_begin_datum(Oid datumType, Oid
*** 1034,1040 ****
  	int16		typlen;
  	bool		typbyval;
  
! 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
--- 1055,1061 ----
  	int16		typlen;
  	bool		typbyval;
  
! 	oldcontext = MemoryContextSwitchTo(state->maincontext);
  
  #ifdef TRACE_SORT
  	if (trace_sort)
*************** tuplesort_set_bound(Tuplesortstate *stat
*** 1145,1160 ****
  }
  
  /*
!  * tuplesort_end
!  *
!  *	Release resources and clean up.
   *
!  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
!  * pointing to garbage.  Be careful not to attempt to use or free such
!  * pointers afterwards!
   */
! void
! tuplesort_end(Tuplesortstate *state)
  {
  	/* context swap probably not needed, but let's be safe */
  	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
--- 1166,1177 ----
  }
  
  /*
!  * tuplesort_free
   *
!  *	Internal routine for freeing resources of tuplesort.
   */
! static void
! tuplesort_free(Tuplesortstate *state, bool delete)
  {
  	/* context swap probably not needed, but let's be safe */
  	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
*************** tuplesort_end(Tuplesortstate *state)
*** 1213,1219 ****
  	 * Free the per-sort memory context, thereby releasing all working memory,
  	 * including the Tuplesortstate struct itself.
  	 */
! 	MemoryContextDelete(state->sortcontext);
  }
  
  /*
--- 1230,1327 ----
  	 * Free the per-sort memory context, thereby releasing all working memory,
  	 * including the Tuplesortstate struct itself.
  	 */
! 	if (delete)
! 	{
! 		MemoryContextDelete(state->maincontext);
! 	}
! 	else
! 	{
! 		MemoryContextResetOnly(state->sortcontext);
! 		MemoryContextResetOnly(state->tuplecontext);
! 	}
! }
! 
! /*
!  * tuplesort_end
!  *
!  *	Release resources and clean up.
!  *
!  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
!  * pointing to garbage.  Be careful not to attempt to use or free such
!  * pointers afterwards!
!  */
! void
! tuplesort_end(Tuplesortstate *state)
! {
! 	tuplesort_free(state, true);
! }
! 
! /*
!  * tuplesort_updatemax 
!  *
!  *	Update maximum resource usage statistics.
!  */
! static void
! tuplesort_updatemax(Tuplesortstate *state)
! {
! 	int64	memUsed;
! 	bool	memUsedOnDisk;
! 
! 	/*
! 	 * Note: it might seem we should provide both memory and disk usage for a
! 	 * disk-based sort.  However, the current code doesn't track memory space
! 	 * accurately once we have begun to return tuples to the caller (since we
! 	 * don't account for pfree's the caller is expected to do), so we cannot
! 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
! 	 * to fix.  Is it worth creating an API for the memory context code to
! 	 * tell us how much is actually used in sortcontext?
! 	 */
! 	if (state->tapeset)
! 	{
! 		memUsedOnDisk = true;
! 		memUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
! 	}
! 	else
! 	{
! 		memUsedOnDisk = false;
! 		memUsed = state->allowedMem - state->availMem;
! 	}
! 
! 	state->maxStatus = Max(state->maxStatus, state->status);
! 	if (memUsed > state->maxMem)
! 	{
! 		state->maxMem = memUsed;
! 		state->maxMemOnDisk = memUsedOnDisk;
! 	}
! }
! 
! /*
!  * tuplesort_reset
!  *
!  *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
!  *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
!  *	a new sort.  It allows evade recreation of tuple sort (and save resources)
!  *	when sorting multiple small batches.
!  */
! void
! tuplesort_reset(Tuplesortstate *state)
! {
! 	tuplesort_updatemax(state);
! 	tuplesort_free(state, false);
! 	state->status = TSS_INITIAL;
! 	state->memtupcount = 0;
! 	state->boundUsed = false;
! 	state->tapeset = NULL;
! 	state->currentRun = 0;
! 	state->result_tape = -1;
! 	state->bounded = false;
! 	state->availMem = state->allowedMem;
! 	state->lastReturnedTuple = NULL;
! 	state->slabAllocatorUsed = false;
! 	state->slabMemoryBegin = NULL;
! 	state->slabMemoryEnd = NULL;
! 	state->slabFreeHead = NULL;
! 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
  }
  
  /*
*************** tuplesort_get_stats(Tuplesortstate *stat
*** 3219,3245 ****
  					const char **spaceType,
  					long *spaceUsed)
  {
! 	/*
! 	 * Note: it might seem we should provide both memory and disk usage for a
! 	 * disk-based sort.  However, the current code doesn't track memory space
! 	 * accurately once we have begun to return tuples to the caller (since we
! 	 * don't account for pfree's the caller is expected to do), so we cannot
! 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
! 	 * to fix.  Is it worth creating an API for the memory context code to
! 	 * tell us how much is actually used in sortcontext?
! 	 */
! 	if (state->tapeset)
! 	{
  		*spaceType = "Disk";
- 		*spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
- 	}
  	else
- 	{
  		*spaceType = "Memory";
! 		*spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
! 	}
  
! 	switch (state->status)
  	{
  		case TSS_SORTEDINMEM:
  			if (state->boundUsed)
--- 3327,3341 ----
  					const char **spaceType,
  					long *spaceUsed)
  {
! 	tuplesort_updatemax(state);
! 
! 	if (state->maxMemOnDisk)
  		*spaceType = "Disk";
  	else
  		*spaceType = "Memory";
! 	*spaceUsed = (state->maxMem + 1023) / 1024;
  
! 	switch (state->maxStatus)
  	{
  		case TSS_SORTEDINMEM:
  			if (state->boundUsed)
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
new file mode 100644
index 9f41bab..c95cb42
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
*************** typedef struct MaterialState
*** 1814,1819 ****
--- 1814,1833 ----
  	Tuplestorestate *tuplestorestate;
  } MaterialState;
  
+ 
+ /* ----------------
+  *	 When performing sorting by multiple keys input dataset could be already
+  *	 presorted by some prefix of these keys.  We call them "skip keys".
+  *	 SkipKeyData represents information about one such key.
+  * ----------------
+  */
+ typedef struct SkipKeyData
+ {
+ 	FmgrInfo				flinfo;	/* comparison function info */
+ 	FunctionCallInfoData	fcinfo;	/* comparison function call info */
+ 	OffsetNumber			attno;	/* attribute number in tuple */
+ } SkipKeyData;
+ 
  /* ----------------
   *	 SortState information
   * ----------------
*************** typedef struct SortState
*** 1825,1833 ****
--- 1839,1852 ----
  	bool		bounded;		/* is the result set bounded? */
  	int64		bound;			/* if bounded, how many tuples are needed */
  	bool		sort_Done;		/* sort completed yet? */
+ 	bool		finished;		/* fetching tuples from outer node
+ 								   is finished ? */
  	bool		bounded_Done;	/* value of bounded we did the sort with */
  	int64		bound_Done;		/* value of bound we did the sort with */
  	void	   *tuplesortstate; /* private state of tuplesort.c */
+ 	SkipKeyData *skipKeys;		/* keys, dataset is presorted by */
+ 	int64		groupsCount;	/* number of groups with equal skip keys */
+ 	TupleTableSlot *prevSlot;	/* slot for previous tuple from outer node */
  } SortState;
  
  /* ---------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
new file mode 100644
index 95dd8ba..7a5dcf5
*** a/src/include/nodes/nodes.h
--- b/src/include/nodes/nodes.h
*************** typedef enum NodeTag
*** 71,76 ****
--- 71,77 ----
  	T_HashJoin,
  	T_Material,
  	T_Sort,
+ 	T_IncSort,
  	T_Group,
  	T_Agg,
  	T_WindowAgg,
*************** typedef enum NodeTag
*** 120,125 ****
--- 121,127 ----
  	T_HashJoinState,
  	T_MaterialState,
  	T_SortState,
+ 	T_IncSortState,
  	T_GroupState,
  	T_AggState,
  	T_WindowAggState,
*************** typedef enum NodeTag
*** 249,254 ****
--- 251,257 ----
  	T_ProjectionPath,
  	T_ProjectSetPath,
  	T_SortPath,
+ 	T_IncSortPath,
  	T_GroupPath,
  	T_UpperUniquePath,
  	T_AggPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
new file mode 100644
index f72f7a8..6b96535
*** a/src/include/nodes/plannodes.h
--- b/src/include/nodes/plannodes.h
*************** typedef struct Sort
*** 699,704 ****
--- 699,715 ----
  	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
  } Sort;
  
+ 
+ /* ----------------
+  *		incremental sort node
+  * ----------------
+  */
+ typedef struct IncSort
+ {
+ 	Sort		sort;
+ 	int			skipCols;		/* number of presorted columns */
+ } IncSort;
+ 
  /* ---------------
   *	 group node -
   *		Used for queries with GROUP BY (but no aggregates) specified.
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
new file mode 100644
index f7ac6f6..2c56105
*** a/src/include/nodes/relation.h
--- b/src/include/nodes/relation.h
*************** typedef struct SortPath
*** 1331,1336 ****
--- 1331,1346 ----
  } SortPath;
  
  /*
+  * IncSortPath
+  */
+ typedef struct IncSortPath
+ {
+ 	SortPath	spath;
+ 	int			skipCols;
+ } IncSortPath;
+ 
+ 
+ /*
   * GroupPath represents grouping (of presorted input)
   *
   * groupClause represents the columns to be grouped on; the input path
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
new file mode 100644
index 72200fa..c26ef9a
*** a/src/include/optimizer/cost.h
--- b/src/include/optimizer/cost.h
*************** extern bool enable_indexonlyscan;
*** 61,66 ****
--- 61,67 ----
  extern bool enable_bitmapscan;
  extern bool enable_tidscan;
  extern bool enable_sort;
+ extern bool enable_incsort;
  extern bool enable_hashagg;
  extern bool enable_nestloop;
  extern bool enable_material;
*************** extern void cost_ctescan(Path *path, Pla
*** 95,102 ****
  			 RelOptInfo *baserel, ParamPathInfo *param_info);
  extern void cost_recursive_union(Path *runion, Path *nrterm, Path *rterm);
  extern void cost_sort(Path *path, PlannerInfo *root,
! 		  List *pathkeys, Cost input_cost, double tuples, int width,
! 		  Cost comparison_cost, int sort_mem,
  		  double limit_tuples);
  extern void cost_merge_append(Path *path, PlannerInfo *root,
  				  List *pathkeys, int n_streams,
--- 96,104 ----
  			 RelOptInfo *baserel, ParamPathInfo *param_info);
  extern void cost_recursive_union(Path *runion, Path *nrterm, Path *rterm);
  extern void cost_sort(Path *path, PlannerInfo *root,
! 		  List *pathkeys, int presorted_keys,
! 		  Cost input_startup_cost, Cost input_total_cost,
! 		  double tuples, int width, Cost comparison_cost, int sort_mem,
  		  double limit_tuples);
  extern void cost_merge_append(Path *path, PlannerInfo *root,
  				  List *pathkeys, int n_streams,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
new file mode 100644
index ebda308..3271203
*** a/src/include/optimizer/paths.h
--- b/src/include/optimizer/paths.h
*************** typedef enum
*** 180,185 ****
--- 180,186 ----
  
  extern PathKeysComparison compare_pathkeys(List *keys1, List *keys2);
  extern bool pathkeys_contained_in(List *keys1, List *keys2);
+ extern int pathkeys_common(List *keys1, List *keys2);
  extern Path *get_cheapest_path_for_pathkeys(List *paths, List *pathkeys,
  							   Relids required_outer,
  							   CostSelector cost_criterion);
*************** extern List *select_outer_pathkeys_for_m
*** 216,221 ****
--- 217,223 ----
  extern List *make_inner_pathkeys_for_merge(PlannerInfo *root,
  							  List *mergeclauses,
  							  List *outer_pathkeys);
+ extern int pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys);
  extern List *truncate_useless_pathkeys(PlannerInfo *root,
  						  RelOptInfo *rel,
  						  List *pathkeys);
diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h
new file mode 100644
index 9f9d2dc..b8884b6
*** a/src/include/utils/selfuncs.h
--- b/src/include/utils/selfuncs.h
*************** extern void mergejoinscansel(PlannerInfo
*** 204,209 ****
--- 204,212 ----
  extern double estimate_num_groups(PlannerInfo *root, List *groupExprs,
  					double input_rows, List **pgset);
  
+ extern double *estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root,
+ 										double tuples);
+ 
  extern Selectivity estimate_hash_bucketsize(PlannerInfo *root, Node *hashkey,
  						 double nbuckets);
  
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
new file mode 100644
index 5b3f475..616f9f5
*** a/src/include/utils/tuplesort.h
--- b/src/include/utils/tuplesort.h
*************** extern Tuplesortstate *tuplesort_begin_h
*** 62,68 ****
  					 int nkeys, AttrNumber *attNums,
  					 Oid *sortOperators, Oid *sortCollations,
  					 bool *nullsFirstFlags,
! 					 int workMem, bool randomAccess);
  extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
  						Relation indexRel,
  						int workMem, bool randomAccess);
--- 62,69 ----
  					 int nkeys, AttrNumber *attNums,
  					 Oid *sortOperators, Oid *sortCollations,
  					 bool *nullsFirstFlags,
! 					 int workMem, bool randomAccess,
! 					 bool skipAbbrev);
  extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
  						Relation indexRel,
  						int workMem, bool randomAccess);
*************** extern bool tuplesort_skiptuples(Tupleso
*** 104,109 ****
--- 105,112 ----
  
  extern void tuplesort_end(Tuplesortstate *state);
  
+ extern void tuplesort_reset(Tuplesortstate *state);
+ 
  extern void tuplesort_get_stats(Tuplesortstate *state,
  					const char **sortMethod,
  					const char **spaceType,
diff --git a/src/test/isolation/expected/drop-index-concurrently-1.out b/src/test/isolation/expected/drop-index-concurrently-1.out
new file mode 100644
index 75dff56..e11fb61
*** a/src/test/isolation/expected/drop-index-concurrently-1.out
--- b/src/test/isolation/expected/drop-index-concurrently-1.out
*************** Sort           
*** 19,27 ****
  step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
  QUERY PLAN     
  
! Sort           
    Sort Key: id, data
!   ->  Seq Scan on test_dc
          Filter: ((data)::text = '34'::text)
  step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
  id             data           
--- 19,28 ----
  step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
  QUERY PLAN     
  
! Incremental Sort
    Sort Key: id, data
!   Presorted Key: id
!   ->  Index Scan using test_dc_pkey on test_dc
          Filter: ((data)::text = '34'::text)
  step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
  id             data           
diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out
new file mode 100644
index 0ff8062..3ad5eb3
*** a/src/test/regress/expected/aggregates.out
--- b/src/test/regress/expected/aggregates.out
*************** group by t1.a,t1.b,t1.c,t1.d,t2.x,t2.y,t
*** 996,1010 ****
  explain (costs off) select t1.*,t2.x,t2.z
  from t1 inner join t2 on t1.a = t2.x and t1.b = t2.y
  group by t1.a,t1.b,t1.c,t1.d,t2.x,t2.z;
!                       QUERY PLAN                       
! -------------------------------------------------------
!  HashAggregate
     Group Key: t1.a, t1.b, t2.x, t2.z
!    ->  Merge Join
!          Merge Cond: ((t1.a = t2.x) AND (t1.b = t2.y))
!          ->  Index Scan using t1_pkey on t1
!          ->  Index Scan using t2_pkey on t2
! (6 rows)
  
  -- Cannot optimize when PK is deferrable
  explain (costs off) select * from t3 group by a,b,c;
--- 996,1013 ----
  explain (costs off) select t1.*,t2.x,t2.z
  from t1 inner join t2 on t1.a = t2.x and t1.b = t2.y
  group by t1.a,t1.b,t1.c,t1.d,t2.x,t2.z;
!                          QUERY PLAN                          
! -------------------------------------------------------------
!  Group
     Group Key: t1.a, t1.b, t2.x, t2.z
!    ->  Incremental Sort
!          Sort Key: t1.a, t1.b, t2.z
!          Presorted Key: t1.a, t1.b
!          ->  Merge Join
!                Merge Cond: ((t1.a = t2.x) AND (t1.b = t2.y))
!                ->  Index Scan using t1_pkey on t1
!                ->  Index Scan using t2_pkey on t2
! (9 rows)
  
  -- Cannot optimize when PK is deferrable
  explain (costs off) select * from t3 group by a,b,c;
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
new file mode 100644
index a8c8b28..11d697e
*** a/src/test/regress/expected/inherit.out
--- b/src/test/regress/expected/inherit.out
*************** NOTICE:  drop cascades to table matest1
*** 1448,1453 ****
--- 1448,1454 ----
  set enable_seqscan = off;
  set enable_indexscan = on;
  set enable_bitmapscan = off;
+ set enable_incsort = off;
  -- Check handling of duplicated, constant, or volatile targetlist items
  explain (costs off)
  SELECT thousand, tenthous FROM tenk1
*************** FROM generate_series(1, 3) g(i);
*** 1588,1596 ****
--- 1589,1633 ----
   {3,7,8,10,13,13,16,18,19,22}
  (3 rows)
  
+ set enable_incsort = on;
+ -- check incremental sort is used when enabled
+ explain (costs off)
+ SELECT thousand, tenthous FROM tenk1
+ UNION ALL
+ SELECT thousand, thousand FROM tenk1
+ ORDER BY thousand, tenthous;
+                                QUERY PLAN                                
+ -------------------------------------------------------------------------
+  Merge Append
+    Sort Key: tenk1.thousand, tenk1.tenthous
+    ->  Index Only Scan using tenk1_thous_tenthous on tenk1
+    ->  Incremental Sort
+          Sort Key: tenk1_1.thousand, tenk1_1.thousand
+          Presorted Key: tenk1_1.thousand
+          ->  Index Only Scan using tenk1_thous_tenthous on tenk1 tenk1_1
+ (7 rows)
+ 
+ explain (costs off)
+ SELECT x, y FROM
+   (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+    UNION ALL
+    SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ ORDER BY x, y;
+                          QUERY PLAN                          
+ -------------------------------------------------------------
+  Merge Append
+    Sort Key: a.thousand, a.tenthous
+    ->  Index Only Scan using tenk1_thous_tenthous on tenk1 a
+    ->  Incremental Sort
+          Sort Key: b.unique2, b.unique2
+          Presorted Key: b.unique2
+          ->  Index Only Scan using tenk1_unique2 on tenk1 b
+ (7 rows)
+ 
  reset enable_seqscan;
  reset enable_indexscan;
  reset enable_bitmapscan;
+ reset enable_incsort;
  --
  -- Check that constraint exclusion works correctly with partitions using
  -- implicit constraints generated from the partition bound information.
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
new file mode 100644
index d48abd7..f6a99d1
*** a/src/test/regress/expected/sysviews.out
--- b/src/test/regress/expected/sysviews.out
*************** select name, setting from pg_settings wh
*** 75,80 ****
--- 75,81 ----
   enable_bitmapscan    | on
   enable_hashagg       | on
   enable_hashjoin      | on
+  enable_incsort       | on
   enable_indexonlyscan | on
   enable_indexscan     | on
   enable_material      | on
*************** select name, setting from pg_settings wh
*** 83,89 ****
   enable_seqscan       | on
   enable_sort          | on
   enable_tidscan       | on
! (11 rows)
  
  -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
  -- more-or-less working.  We can't test their contents in any great detail
--- 84,90 ----
   enable_seqscan       | on
   enable_sort          | on
   enable_tidscan       | on
! (12 rows)
  
  -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
  -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
new file mode 100644
index a8b7eb1..5cf4426
*** a/src/test/regress/sql/inherit.sql
--- b/src/test/regress/sql/inherit.sql
*************** drop table matest0 cascade;
*** 498,503 ****
--- 498,504 ----
  set enable_seqscan = off;
  set enable_indexscan = on;
  set enable_bitmapscan = off;
+ set enable_incsort = off;
  
  -- Check handling of duplicated, constant, or volatile targetlist items
  explain (costs off)
*************** SELECT
*** 559,567 ****
--- 560,585 ----
      ORDER BY f.i LIMIT 10)
  FROM generate_series(1, 3) g(i);
  
+ set enable_incsort = on;
+ 
+ -- check incremental sort is used when enabled
+ explain (costs off)
+ SELECT thousand, tenthous FROM tenk1
+ UNION ALL
+ SELECT thousand, thousand FROM tenk1
+ ORDER BY thousand, tenthous;
+ 
+ explain (costs off)
+ SELECT x, y FROM
+   (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+    UNION ALL
+    SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ ORDER BY x, y;
+ 
  reset enable_seqscan;
  reset enable_indexscan;
  reset enable_bitmapscan;
+ reset enable_incsort;
  
  --
  -- Check that constraint exclusion works correctly with partitions using
