PHOENIX-944 Support derived tables in FROM clause that needs extra steps of 
client-side aggregation or other processing


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f004e135
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f004e135
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f004e135

Branch: refs/heads/4.0
Commit: f004e135c8ae72c6a22ed9879b37dd6dcf86627a
Parents: e8a0355
Author: maryannxue <maryann...@apache.org>
Authored: Wed Oct 22 13:48:35 2014 -0400
Committer: maryannxue <maryann...@apache.org>
Committed: Thu Oct 23 23:25:18 2014 -0400

----------------------------------------------------------------------
 .../apache/phoenix/end2end/DerivedTableIT.java  | 282 +++++++++++++------
 .../org/apache/phoenix/end2end/SubqueryIT.java  |  12 +
 .../apache/phoenix/compile/FromCompiler.java    |  27 +-
 .../apache/phoenix/compile/GroupByCompiler.java |   5 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../apache/phoenix/compile/OrderByCompiler.java |   2 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  58 +++-
 .../phoenix/compile/SubqueryRewriter.java       |  10 +-
 .../TrackOrderPreservingExpressionCompiler.java |  27 +-
 .../apache/phoenix/compile/WhereCompiler.java   |  33 +--
 .../GroupedAggregateRegionObserver.java         |   2 +-
 .../coprocessor/HashJoinRegionScanner.java      |   4 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |   3 +-
 .../UngroupedAggregateRegionObserver.java       |   2 +-
 .../phoenix/execute/ClientAggregatePlan.java    | 229 +++++++++++++++
 .../phoenix/execute/ClientProcessingPlan.java   |  82 ++++++
 .../apache/phoenix/execute/ClientScanPlan.java  |  92 ++++++
 .../apache/phoenix/execute/HashJoinPlan.java    |  24 +-
 .../phoenix/execute/TupleProjectionPlan.java    |  49 +---
 .../apache/phoenix/execute/TupleProjector.java  | 276 ++++++++++++++++++
 .../expression/ProjectedColumnExpression.java   |   2 +-
 .../DistinctValueClientAggregator.java          |   7 +-
 .../BaseGroupedAggregatingResultIterator.java   | 105 +++++++
 .../GroupedAggregatingResultIterator.java       |  67 +----
 .../iterate/LookAheadResultIterator.java        |   4 +
 .../org/apache/phoenix/join/TupleProjector.java | 260 -----------------
 .../apache/phoenix/optimize/QueryOptimizer.java |   1 +
 .../apache/phoenix/parse/ParseNodeFactory.java  |   4 +-
 .../apache/phoenix/parse/SelectStatement.java   |   7 +
 .../org/apache/phoenix/schema/ColumnRef.java    |   2 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |   2 +-
 31 files changed, 1183 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
index 8a80764..8ef542a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
@@ -35,19 +35,19 @@ import static 
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -61,33 +61,65 @@ import com.google.common.collect.Lists;
 @RunWith(Parameterized.class)
 public class DerivedTableIT extends BaseClientManagedTimeIT {
     private static final String tenantId = getOrganizationId();
-    private static final String MSG = "Complex nested queries not supported.";
     
     private long ts;
-    private String indexDDL;
+    private String[] indexDDL;
+    private String[] plans;
     
-    public DerivedTableIT(String indexDDL) {
+    public DerivedTableIT(String[] indexDDL, String[] plans) {
         this.indexDDL = indexDDL;
+        this.plans = plans;
     }
     
     @Before
     public void initTable() throws Exception {
          ts = nextTimestamp();
         initATableValues(tenantId, getDefaultSplits(tenantId), null, ts);
-        if (indexDDL != null && indexDDL.length() > 0) {
+        if (indexDDL != null && indexDDL.length > 0) {
             Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
             Connection conn = DriverManager.getConnection(getUrl(), props);
-            conn.createStatement().execute(indexDDL);
+            for (String ddl : indexDDL) {
+                conn.createStatement().execute(ddl);
+            }
         }
     }
     
     @Parameters(name="{0}")
     public static Collection<Object> data() {
         List<Object> testCases = Lists.newArrayList();
-        testCases.add(new String[] { "CREATE INDEX ATABLE_DERIVED_IDX ON 
aTable (a_byte) INCLUDE ("
-                + "    A_STRING, " + "    B_STRING)" });
-        testCases.add(new String[] { "" });
+        testCases.add(new String[][] {
+                { 
+                "CREATE INDEX ATABLE_DERIVED_IDX ON aTable (a_byte) INCLUDE 
(A_STRING, B_STRING)" 
+                }, {
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, 
B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [B_STRING]\n" +
+                "CLIENT SORTED BY [A]\n" +
+                "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+                "CLIENT SORTED BY [A DESC]",
+                
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, 
B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+                "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}});
+        testCases.add(new String[][] {
+                {}, {
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, 
B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [B_STRING]\n" +
+                "CLIENT SORTED BY [A]\n" +
+                "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+                "CLIENT SORTED BY [A DESC]",
+                
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, 
B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+                "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}});
         return testCases;
     }
 
@@ -183,21 +215,21 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             
             // (limit) where
             query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM 
aTable LIMIT 2) AS t WHERE t.b = '" + C_VALUE + "'";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(ROW2,rs.getString(1));
+
+            assertFalse(rs.next());
 
             // (count) where
             query = "SELECT t.c FROM (SELECT count(*) c FROM aTable) AS t 
WHERE t.c > 0";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(9,rs.getInt(1));
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }
@@ -227,12 +259,78 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             
             // (groupby) groupby
             query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable 
GROUP BY a_string) AS t GROUP BY t.c";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertEquals(1,rs.getInt(2));
+            assertTrue (rs.next());
+            assertEquals(4,rs.getInt(1));
+            assertEquals(2,rs.getInt(2));
+
+            assertFalse(rs.next());
+            
+            // (groupby) groupby orderby
+            query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable 
GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(4,rs.getInt(1));
+            assertEquals(2,rs.getInt(2));
+            assertTrue (rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertEquals(1,rs.getInt(2));
+
+            assertFalse(rs.next());
+            
+            // (groupby a, b orderby b) groupby a orderby a
+            query = "SELECT t.a, COLLECTDISTINCT(t.b) FROM (SELECT b_string b, 
a_string a FROM aTable GROUP BY a_string, b_string ORDER BY b_string) AS t 
GROUP BY t.a ORDER BY t.a DESC";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(C_VALUE,rs.getString(1));
+            String[] b = new String[1];
+            b[0] = E_VALUE;
+            Array array = conn.createArrayOf("VARCHAR", b);
+            assertEquals(array,rs.getArray(2));
+            assertTrue (rs.next());
+            assertEquals(B_VALUE,rs.getString(1));
+            b = new String[3];
+            b[0] = B_VALUE;
+            b[1] = C_VALUE;
+            b[2] = E_VALUE;
+            array = conn.createArrayOf("VARCHAR", b);
+            assertEquals(array,rs.getArray(2));
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertEquals(array,rs.getArray(2));
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals(plans[0], QueryUtil.getExplainPlan(rs));
+            
+            // distinct b (groupby b, a) groupby a
+            query = "SELECT DISTINCT COLLECTDISTINCT(t.b) FROM (SELECT 
b_string b, a_string a FROM aTable GROUP BY b_string, a_string) AS t GROUP BY 
t.a";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            b = new String[1];
+            b[0] = E_VALUE;
+            array = conn.createArrayOf("VARCHAR", b);
+            assertEquals(array,rs.getArray(1));
+            assertTrue (rs.next());
+            b = new String[3];
+            b[0] = B_VALUE;
+            b[1] = C_VALUE;
+            b[2] = E_VALUE;
+            array = conn.createArrayOf("VARCHAR", b);
+            assertEquals(array,rs.getArray(1));
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals(plans[1], QueryUtil.getExplainPlan(rs));
         } finally {
             conn.close();
         }
@@ -321,13 +419,15 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             assertFalse(rs.next());
             
             // (limit) orderby
-            query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM 
aTable LIMIT 2) AS t ORDER BY t.b, t.eid";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM 
aTable LIMIT 2) AS t ORDER BY t.b DESC, t.eid";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(ROW2,rs.getString(1));
+            assertTrue (rs.next());
+            assertEquals(ROW1,rs.getString(1));
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }
@@ -386,15 +486,16 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             
             // limit ? limit ?            
             query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 
?) AS t LIMIT ?";
-            try {
-                statement = conn.prepareStatement(query);
-                statement.setInt(1, 4);
-                statement.setInt(2, 2);
-                statement.executeQuery();
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            statement.setInt(1, 4);
+            statement.setInt(2, 2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(ROW1,rs.getString(1));
+            assertTrue (rs.next());
+            assertEquals(ROW2,rs.getString(1));
+
+            assertFalse(rs.next());
             
             // (groupby orderby) limit
             query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM 
aTable GROUP BY a_string ORDER BY sum(a_byte)) LIMIT 2";
@@ -466,30 +567,51 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             
             // distinct (distinct)
             query = "SELECT DISTINCT t.a FROM (SELECT DISTINCT a_string a, 
b_string b FROM aTable) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertTrue (rs.next());
+            assertEquals(B_VALUE,rs.getString(1));
+            assertTrue (rs.next());
+            assertEquals(C_VALUE,rs.getString(1));
+
+            assertFalse(rs.next());
             
             // distinct (groupby)
             query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable 
GROUP BY a_string) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertTrue (rs.next());
+            assertEquals(4,rs.getInt(1));
+
+            assertFalse(rs.next());
+            
+            // distinct (groupby) orderby
+            query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable 
GROUP BY a_string) AS t ORDER BY t.c DESC";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(4,rs.getInt(1));
+            assertTrue (rs.next());
+            assertEquals(1,rs.getInt(1));
+
+            assertFalse(rs.next());
             
             // distinct (limit)
             query = "SELECT DISTINCT t.a, t.b FROM (SELECT a_string a, 
b_string b FROM aTable LIMIT 2) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertEquals(B_VALUE,rs.getString(2));
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertEquals(C_VALUE,rs.getString(2));
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }
@@ -522,30 +644,30 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             
             // count (distinct)
             query = "SELECT count(*) FROM (SELECT DISTINCT a_string FROM 
aTable) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(3,rs.getInt(1));
+
+            assertFalse(rs.next());
             
             // count (groupby)
             query = "SELECT count(*) FROM (SELECT count(*) c FROM aTable GROUP 
BY a_string) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(3,rs.getInt(1));
+
+            assertFalse(rs.next());
             
             // count (limit)
             query = "SELECT count(*) FROM (SELECT entity_id FROM aTable LIMIT 
2) AS t";
-            try {
-                conn.createStatement().executeQuery(query);
-                fail("Should have got SQLFeatureNotSupportedException");
-            } catch (SQLFeatureNotSupportedException e) {                
-                assertEquals(MSG, e.getMessage());
-            }
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(2,rs.getInt(1));
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index e4b4c8b..4f3ca16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -978,6 +978,18 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             assertEquals(rs.getString(2), "T6");
 
             assertFalse(rs.next());
+            
+            query = "SELECT \"order_id\", name FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON 
o.\"item_id\" = i.\"item_id\" WHERE quantity != ANY(SELECT quantity FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\" GROUP BY 
quantity)";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 0fed42a..1627f45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.BindTableNode;
@@ -175,6 +176,23 @@ public class FromCompiler {
         SingleTableColumnResolver visitor = new 
SingleTableColumnResolver(connection, statement.getTable(), true);
         return visitor;
     }
+    
+    public static ColumnResolver 
getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef 
tableRef, RowProjector projector) 
+            throws SQLException {
+        List<PColumn> projectedColumns = new ArrayList<PColumn>();
+        List<Expression> sourceExpressions = new ArrayList<Expression>();
+        PTable table = tableRef.getTable();
+        for (PColumn column : table.getColumns()) {
+            Expression sourceExpression = 
projector.getColumnProjector(column.getPosition()).getExpression();
+            PColumnImpl projectedColumn = new PColumnImpl(column.getName(), 
column.getFamilyName(), 
+                    sourceExpression.getDataType(), 
sourceExpression.getMaxLength(), sourceExpression.getScale(), 
sourceExpression.isNullable(), 
+                    column.getPosition(), sourceExpression.getSortOrder(), 
column.getArraySize(), column.getViewConstant(), column.isViewReferenced());    
            
+            projectedColumns.add(projectedColumn);
+            sourceExpressions.add(sourceExpression);
+        }
+        PTable t = PTableImpl.makePTable(table, projectedColumns);
+        return new SingleTableColumnResolver(connection, new 
TableRef(tableRef.getTableAlias(), t, tableRef.getLowerBoundTimeStamp(), 
tableRef.hasDynamicCols()));
+    }
 
     public static ColumnResolver getResolverForMutation(DMLStatement 
statement, PhoenixConnection connection)
             throws SQLException {
@@ -215,6 +233,12 @@ public class FromCompiler {
             TableRef tableRef = createTableRef(tableNode, 
updateCacheImmediately);
             tableRefs = ImmutableList.of(tableRef);
         }
+        
+        public SingleTableColumnResolver(PhoenixConnection connection, 
TableRef tableRef) {
+            super(connection, 0);
+            alias = tableRef.getTableAlias();
+            tableRefs = ImmutableList.of(tableRef);
+        }
 
         @Override
                public List<TableRef> getTables() {
@@ -366,8 +390,7 @@ public class FromCompiler {
         }
     }
     
-    // TODO: unused, but should be used for joins - make private once used
-    public static class MultiTableColumnResolver extends BaseColumnResolver 
implements TableNodeVisitor<Void> {
+    private static class MultiTableColumnResolver extends BaseColumnResolver 
implements TableNodeVisitor<Void> {
         private final ListMultimap<String, TableRef> tableMap;
         private final List<TableRef> tables;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index dda27aa..a561a47 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -30,6 +30,7 @@ import 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.parse.AliasedNode;
@@ -135,7 +136,7 @@ public class GroupByCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is 
ambiguous across multiple tables
      */
-    public static GroupBy compile(StatementContext context, SelectStatement 
statement) throws SQLException {
+    public static GroupBy compile(StatementContext context, SelectStatement 
statement, TupleProjector tupleProjector) throws SQLException {
         List<ParseNode> groupByNodes = statement.getGroupBy();
         /**
          * Distinct can use an aggregate plan if there's no group by.
@@ -160,7 +161,7 @@ public class GroupByCompiler {
         TrackOrderPreservingExpressionCompiler groupByVisitor =
                 new TrackOrderPreservingExpressionCompiler(context, 
                         GroupBy.EMPTY_GROUP_BY, groupByNodes.size(), 
-                        Ordering.UNORDERED);
+                        Ordering.UNORDERED, tupleProjector);
         for (ParseNode node : groupByNodes) {
             Expression expression = node.accept(groupByVisitor);
             if (groupByVisitor.isAggregate()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index ef053de..140146c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
 import org.apache.phoenix.parse.BindTableNode;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 0fd07ec..2629846 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -85,7 +85,7 @@ public class OrderByCompiler {
         // accumulate columns in ORDER BY
         TrackOrderPreservingExpressionCompiler visitor = 
                 new TrackOrderPreservingExpressionCompiler(context, groupBy, 
-                        orderByNodes.size(), Ordering.ORDERED);
+                        orderByNodes.size(), Ordering.ORDERED, null);
         LinkedHashSet<OrderByExpression> orderByExpressions = 
Sets.newLinkedHashSetWithExpectedSize(orderByNodes.size());
         for (OrderByNode node : orderByNodes) {
             boolean isAscending = node.isAscending();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index d82ac02..0eafcdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -34,11 +34,14 @@ import 
org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
 import org.apache.phoenix.compile.JoinCompiler.Table;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
 import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -47,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.ParseNode;
@@ -59,11 +61,11 @@ import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 
 
@@ -349,30 +351,49 @@ public class QueryCompiler {
     }
     
     protected QueryPlan compileSingleQuery(StatementContext context, 
SelectStatement select, List<Object> binds, boolean asSubquery, boolean 
allowPageFilter) throws SQLException{
+        SelectStatement innerSelect = select.getInnerSelectStatement();
+        if (innerSelect == null) {
+            return compileSingleFlatQuery(context, select, binds, asSubquery, 
allowPageFilter, null, null);
+        }
+        
+        QueryPlan innerPlan = compileSubquery(innerSelect);
+        TupleProjector tupleProjector = new 
TupleProjector(innerPlan.getProjector());
+        innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null);
+        
+        // Replace the original resolver and table with those having compiled 
type info.
+        TableRef tableRef = context.getResolver().getTables().get(0);        
+        ColumnResolver resolver = 
FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), 
tableRef, innerPlan.getProjector());
+        context.setResolver(resolver);
+        tableRef = resolver.getTables().get(0);
+        context.setCurrentTable(tableRef);
+        
+        return compileSingleFlatQuery(context, select, binds, asSubquery, 
allowPageFilter, innerPlan, 
innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : 
null);
+    }
+    
+    protected QueryPlan compileSingleFlatQuery(StatementContext context, 
SelectStatement select, List<Object> binds, boolean asSubquery, boolean 
allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) 
throws SQLException{
         PhoenixConnection connection = statement.getConnection();
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
         PTable table = tableRef.getTable();
         
-        // TODO PHOENIX-944. See DerivedTableIT for a list of unsupported 
cases.
-        if (table.getType() == PTableType.SUBQUERY)
-            throw new SQLFeatureNotSupportedException("Complex nested queries 
not supported.");
-        
         ParseNode viewWhere = null;
         if (table.getViewStatement() != null) {
             viewWhere = new 
SQLParser(table.getViewStatement()).parseQuery().getWhere();
         }
         Integer limit = LimitCompiler.compile(context, select);
 
-        GroupBy groupBy = GroupByCompiler.compile(context, select);
+        GroupBy groupBy = GroupByCompiler.compile(context, select, 
innerPlanTupleProjector);
         // Optimize the HAVING clause by finding any group by expressions that 
can be moved
         // to the WHERE clause
         select = HavingCompiler.rewrite(context, select, groupBy);
         Expression having = HavingCompiler.compile(context, select, groupBy);
         // Don't pass groupBy when building where clause expression, because 
we do not want to wrap these
         // expressions as group by key expressions since they're pre, not post 
filtered.
-        context.setResolver(FromCompiler.getResolverForQuery(select, 
connection));
-        Set<SubqueryParseNode> subqueries = WhereCompiler.compile(context, 
select, viewWhere);
+        if (innerPlan == null) {
+            context.setResolver(FromCompiler.getResolverForQuery(select, 
connection));
+        }
+        Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> 
newHashSet();
+        Expression where = WhereCompiler.compile(context, select, viewWhere, 
subqueries);
         context.setResolver(resolver); // recover resolver
         OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, 
limit); 
         RowProjector projector = ProjectionCompiler.compile(context, select, 
groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
@@ -386,10 +407,14 @@ public class QueryCompiler {
                 limit = maxRows;
             }
         }
-        ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : 
this.parallelIteratorFactory;
-        QueryPlan plan = select.isAggregate() || select.isDistinct() ? 
-                  new AggregatePlan(context, select, tableRef, projector, 
limit, orderBy, parallelIteratorFactory, groupBy, having)
-                : new ScanPlan(context, select, tableRef, projector, limit, 
orderBy, parallelIteratorFactory, allowPageFilter);
+        
+        QueryPlan plan = innerPlan;
+        if (plan == null) {
+            ParallelIteratorFactory parallelIteratorFactory = asSubquery ? 
null : this.parallelIteratorFactory;
+            plan = select.isAggregate() || select.isDistinct() ? 
+                      new AggregatePlan(context, select, tableRef, projector, 
limit, orderBy, parallelIteratorFactory, groupBy, having)
+                    : new ScanPlan(context, select, tableRef, projector, 
limit, orderBy, parallelIteratorFactory, allowPageFilter);
+        }
         if (!subqueries.isEmpty()) {
             int count = subqueries.size();
             WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count];
@@ -401,6 +426,13 @@ public class QueryCompiler {
             plan = HashJoinPlan.create(select, plan, null, subPlans);
         }
         
+        if (innerPlan != null) {
+            plan =  select.isAggregate() || select.isDistinct() ?
+                      new ClientAggregatePlan(context, select, tableRef, 
projector, limit, where, orderBy, groupBy, having, plan)
+                    : new ClientScanPlan(context, select, tableRef, projector, 
limit, where, orderBy, plan);
+
+        }
+        
         return plan;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 4b37259..3e470ce 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -238,7 +238,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 groupbyNodes.add(aliasedNode.getNode());
             }
             groupbyNodes.addAll(subquery.getGroupBy());
-            subquery = NODE_FACTORY.select(subquery, selectNodes, where, 
groupbyNodes, true);
+            subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), 
selectNodes, where, groupbyNodes, true);
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -323,11 +323,11 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         }
         
         if (derivedTableAlias == null) {
-            subquery = NODE_FACTORY.select(subquery, selectNodes, where, 
groupbyNodes, true);
+            subquery = NODE_FACTORY.select(subquery, false, selectNodes, 
where, groupbyNodes, true);
         } else {
             List<ParseNode> derivedTableGroupBy = 
Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + 
groupbyNodes.size());
-            derivedTableGroupBy.addAll(subquery.getGroupBy());
             derivedTableGroupBy.addAll(groupbyNodes);
+            derivedTableGroupBy.addAll(subquery.getGroupBy());
             List<AliasedNode> derivedTableSelect = 
Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 
1);
             derivedTableSelect.addAll(aliasedNodes);
             for (int i = 1; i < selectNodes.size(); i++) {
@@ -338,8 +338,8 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 selectNodes.set(i, aliasedNode);
                 groupbyNodes.set(i - 1, aliasedNode.getNode());
             }
-            SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, 
derivedTableSelect, where, derivedTableGroupBy, true);
-            subquery = 
NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias,
 derivedTableStmt)), subquery.getHint(), false, selectNodes, null, 
groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, 
subquery.getBindCount(), true, subquery.hasSequence());
+            SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, 
subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
+            subquery = 
NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias,
 derivedTableStmt)), subquery.getHint(), false, selectNodes, null, 
groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, 
subquery.getBindCount(), true, false);
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
index 44f9527..9fd6837 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
@@ -24,8 +24,10 @@ import java.util.Comparator;
 import java.util.List;
 
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.function.FunctionExpression;
 import 
org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving;
 import org.apache.phoenix.parse.CaseParseNode;
@@ -35,10 +37,8 @@ import org.apache.phoenix.parse.MultiplyParseNode;
 import org.apache.phoenix.parse.SubtractParseNode;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.util.SchemaUtil;
-
 import com.google.common.collect.Lists;
 
 /**
@@ -57,12 +57,13 @@ public class TrackOrderPreservingExpressionCompiler extends 
ExpressionCompiler {
     private final List<Entry> entries;
     private final Ordering ordering;
     private final int positionOffset;
+    private final TupleProjector tupleProjector; // for derived-table query 
compilation
     private OrderPreserving orderPreserving = OrderPreserving.YES;
     private ColumnRef columnRef;
     private boolean isOrderPreserving = true;
     private Boolean isReverse;
     
-    TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy 
groupBy, int expectedEntrySize, Ordering ordering) {
+    TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy 
groupBy, int expectedEntrySize, Ordering ordering, TupleProjector 
tupleProjector) {
         super(context, groupBy);
         PTable table = context.getResolver().getTables().get(0).getTable();
         boolean isSalted = table.getBucketNum() != null;
@@ -72,6 +73,7 @@ public class TrackOrderPreservingExpressionCompiler extends 
ExpressionCompiler {
         positionOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + 
(isSharedViewIndex ? 1 : 0);
         entries = Lists.newArrayListWithExpectedSize(expectedEntrySize);
         this.ordering = ordering;
+        this.tupleProjector = tupleProjector;
     }
     
     public Boolean isReverse() {
@@ -159,7 +161,7 @@ public class TrackOrderPreservingExpressionCompiler extends 
ExpressionCompiler {
         ColumnRef ref = super.resolveColumn(node);
         // If we encounter any non PK column, then we can't aggregate 
on-the-fly
         // because the distinct groups have no correlation to the KV column 
value
-        if (!SchemaUtil.isPKColumn(ref.getColumn())) {
+        if (getColumnPKPosition(ref) < 0) {
             orderPreserving = OrderPreserving.NO;
         }
         
@@ -173,6 +175,17 @@ public class TrackOrderPreservingExpressionCompiler 
extends ExpressionCompiler {
         }
         return ref;
     }
+    
+    private int getColumnPKPosition(ColumnRef ref) {
+        if (tupleProjector != null && ref.getTable().getType() == 
PTableType.SUBQUERY) {
+            Expression expression = 
tupleProjector.getExpressions()[ref.getColumnPosition()];
+            if (expression instanceof RowKeyColumnExpression) {
+                return ((RowKeyColumnExpression) expression).getPosition();
+            }
+        }
+        
+        return ref.getPKSlotPosition();
+    }
 
     public boolean addEntry(Expression expression) {
         if (expression instanceof LiteralExpression) {
@@ -206,7 +219,7 @@ public class TrackOrderPreservingExpressionCompiler extends 
ExpressionCompiler {
         return entries;
     }
 
-    public static class Entry {
+    public class Entry {
         private final Expression expression;
         private final ColumnRef columnRef;
         private final OrderPreserving orderPreserving;
@@ -222,7 +235,7 @@ public class TrackOrderPreservingExpressionCompiler extends 
ExpressionCompiler {
         }
 
         public int getPkPosition() {
-            return columnRef.getPKSlotPosition();
+            return getColumnPKPosition(columnRef);
         }
 
         public int getColumnPosition() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 51d0ffc..2c49fed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -41,7 +41,6 @@ import 
org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
@@ -78,8 +77,8 @@ public class WhereCompiler {
     private WhereCompiler() {
     }
 
-    public static Set<SubqueryParseNode> compile(StatementContext context, 
FilterableStatement statement) throws SQLException {
-        return compile(context, statement, null);
+    public static Expression compile(StatementContext context, 
FilterableStatement statement) throws SQLException {
+        return compile(context, statement, null, null);
     }
     
     /**
@@ -92,8 +91,8 @@ public class WhereCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is 
ambiguous across multiple tables
      */
-    public static Set<SubqueryParseNode> compile(StatementContext context, 
FilterableStatement statement, ParseNode viewWhere) throws SQLException {
-        return compile(context, statement, viewWhere, 
Collections.<Expression>emptyList(), false);
+    public static Expression compile(StatementContext context, 
FilterableStatement statement, ParseNode viewWhere, Set<SubqueryParseNode> 
subqueryNodes) throws SQLException {
+        return compile(context, statement, viewWhere, 
Collections.<Expression>emptyList(), false, subqueryNodes);
     }
     
     /**
@@ -106,18 +105,20 @@ public class WhereCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is 
ambiguous across multiple tables
      */    
-    public static Set<SubqueryParseNode> compile(StatementContext context, 
FilterableStatement statement, ParseNode viewWhere, List<Expression> 
dynamicFilters, boolean hashJoinOptimization) throws SQLException {
+    public static Expression compile(StatementContext context, 
FilterableStatement statement, ParseNode viewWhere, List<Expression> 
dynamicFilters, boolean hashJoinOptimization, Set<SubqueryParseNode> 
subqueryNodes) throws SQLException {
         ParseNode where = statement.getWhere();
-        Set<SubqueryParseNode> subqueryNodes = Sets.<SubqueryParseNode> 
newHashSet();
-        SubqueryParseNodeVisitor subqueryVisitor = new 
SubqueryParseNodeVisitor(context, subqueryNodes);
-        if (where != null) {
-            where.accept(subqueryVisitor);
-        }
-        if (viewWhere != null) {
-            viewWhere.accept(subqueryVisitor);
+        if (subqueryNodes != null) { // if the subqueryNodes passed in is 
null, we assume there will be no sub-queries in the WHERE clause.
+            SubqueryParseNodeVisitor subqueryVisitor = new 
SubqueryParseNodeVisitor(context, subqueryNodes);
+            if (where != null) {
+                where.accept(subqueryVisitor);
+            }
+            if (viewWhere != null) {
+                viewWhere.accept(subqueryVisitor);
+            }
+            if (!subqueryNodes.isEmpty()) {
+                return null;
+            }
         }
-        if (!subqueryNodes.isEmpty())
-            return subqueryNodes;
         
         Set<Expression> extractedNodes = Sets.<Expression>newHashSet();
         WhereExpressionCompiler whereCompiler = new 
WhereExpressionCompiler(context);
@@ -142,7 +143,7 @@ public class WhereCompiler {
         expression = WhereOptimizer.pushKeyExpressionsToScan(context, 
statement, expression, extractedNodes);
         setScanFilter(context, statement, expression, 
whereCompiler.disambiguateWithFamily, hashJoinOptimization);
 
-        return subqueryNodes;
+        return expression;
     }
     
     private static class WhereExpressionCompiler extends ExpressionCompiler {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 8add152..91a9bdd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -60,7 +61,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 8e0d42d..724122d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
-import org.apache.phoenix.join.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.KeyValueSchema;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 8c72dd5..1672fd7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.OrderByExpression;
@@ -51,7 +53,6 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 710409f..d915948 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -70,7 +71,6 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ConstraintViolationException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
new file mode 100644
index 0000000..a9347e1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.iterate.AggregatingResultIterator;
+import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+import com.google.common.collect.Lists;
+
+public class ClientAggregatePlan extends ClientProcessingPlan {
+    private final GroupBy groupBy;
+    private final Expression having;
+    private final Aggregators serverAggregators;
+    private final Aggregators clientAggregators;
+    
+    public ClientAggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table, RowProjector projector,
+            Integer limit, Expression where, OrderBy orderBy, GroupBy groupBy, 
Expression having, QueryPlan delegate) {
+        super(context, statement, table, projector, limit, where, orderBy, 
delegate);
+        this.groupBy = groupBy;
+        this.having = having;
+        this.serverAggregators =
+                ServerAggregators.deserialize(context.getScan()
+                        .getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
QueryServicesOptions.withDefaults().getConfiguration());
+        this.clientAggregators = 
context.getAggregationManager().getAggregators();
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        ResultIterator iterator = delegate.iterator();
+        if (where != null) {
+            iterator = new FilterResultIterator(iterator, where);
+        }
+        
+        AggregatingResultIterator aggResultIterator;
+        if (groupBy.isEmpty()) {
+            aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
+            aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
+        } else {
+            if (!groupBy.isOrderPreserving()) {
+                int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
+                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+                List<Expression> keyExpressions = groupBy.getKeyExpressions();
+                List<OrderByExpression> keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
+                for (Expression keyExpression : keyExpressions) {
+                    keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
+                }
+                iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, limit, 
projector.getEstimatedRowByteSize());
+            }
+            aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getExpressions());
+            aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
+        }
+
+        if (having != null) {
+            aggResultIterator = new 
FilterAggregatingResultIterator(aggResultIterator, having);
+        }
+        
+        if (statement.isDistinct() && statement.isAggregate()) { // Dedup on 
client if select distinct and aggregation
+            aggResultIterator = new 
DistinctAggregatingResultIterator(aggResultIterator, getProjector());
+        }
+
+        ResultIterator resultScanner = aggResultIterator;
+        if (orderBy.getOrderByExpressions().isEmpty()) {
+            if (limit != null) {
+                resultScanner = new LimitingResultIterator(aggResultIterator, 
limit);
+            }
+        } else {
+            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
+                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit);
+        }
+        if (context.getSequenceManager().getSequenceCount() > 0) {
+            resultScanner = new SequenceResultIterator(resultScanner, 
context.getSequenceManager());
+        }
+        
+        return resultScanner;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> planSteps = 
Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+        if (where != null) {
+            planSteps.add("CLIENT FILTER BY " + where.toString());
+        }
+        if (!groupBy.isEmpty()) {
+            if (!groupBy.isOrderPreserving()) {
+                planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
+            }
+            planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+        } else {
+            planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");            
+        }
+        if (having != null) {
+            planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + 
having.toString());
+        }
+        if (statement.isDistinct() && statement.isAggregate()) {
+            planSteps.add("CLIENT DISTINCT ON " + projector.toString());
+        }
+        if (orderBy.getOrderByExpressions().isEmpty()) {
+            if (limit != null) {
+                planSteps.add("CLIENT " + limit + " ROW LIMIT");
+            }
+        } else {
+            planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " 
ROW"  + (limit == 1 ? "" : "S"))  + " SORTED BY " + 
orderBy.getOrderByExpressions().toString());
+        }
+        if (context.getSequenceManager().getSequenceCount() > 0) {
+            int nSequences = context.getSequenceManager().getSequenceCount();
+            planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " 
SEQUENCE" + (nSequences == 1 ? "" : "S"));
+        }
+        
+        return new ExplainPlan(planSteps);
+    }
+
+    @Override
+    public GroupBy getGroupBy() {
+        return groupBy;
+    }
+    
+    private static class ClientGroupedAggregatingResultIterator extends 
BaseGroupedAggregatingResultIterator {
+        private final List<Expression> groupByExpressions;
+
+        public ClientGroupedAggregatingResultIterator(PeekingResultIterator 
iterator, Aggregators aggregators, List<Expression> groupByExpressions) {
+            super(iterator, aggregators);
+            this.groupByExpressions = groupByExpressions;
+        }
+
+        @Override
+        protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
+                ImmutableBytesWritable ptr) throws SQLException {
+            try {
+                ImmutableBytesWritable key = 
TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
+                ptr.set(key.get(), key.getOffset(), key.getLength());
+                return ptr;
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+        }
+
+        @Override
+        protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
+            return new MultiKeyValueTuple(Collections.<Cell> 
singletonList(keyValue));
+        }
+
+        @Override
+        public String toString() {
+            return "ClientGroupedAggregatingResultIterator [resultIterator=" 
+                    + resultIterator + ", aggregators=" + aggregators + ", 
groupByExpressions="
+                    + groupByExpressions + "]";
+        }
+    }
+
+    private static class ClientUngroupedAggregatingResultIterator extends 
BaseGroupedAggregatingResultIterator {
+
+        public ClientUngroupedAggregatingResultIterator(PeekingResultIterator 
iterator, Aggregators aggregators) {
+            super(iterator, aggregators);
+        }
+
+        @Override
+        protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
+                ImmutableBytesWritable ptr) throws SQLException {
+            tuple.getKey(ptr);
+            return ptr;
+        }
+
+        @Override
+        protected Tuple wrapKeyValueAsResult(KeyValue keyValue)
+                throws SQLException {
+            return new MultiKeyValueTuple(Collections.<Cell> 
singletonList(keyValue));
+        }
+
+        @Override
+        public String toString() {
+            return "ClientUngroupedAggregatingResultIterator [resultIterator=" 
+                    + resultIterator + ", aggregators=" + aggregators + "]";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
new file mode 100644
index 0000000..8e787b4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.schema.TableRef;
+
+/**
+ * Query plan that does where, order-by limit at client side, which is
+ * for derived-table queries that cannot be flattened by SubselectRewriter.
+ */
+public abstract class ClientProcessingPlan extends DelegateQueryPlan {
+    protected final StatementContext context;
+    protected final FilterableStatement statement;
+    protected final TableRef table;
+    protected final RowProjector projector;
+    protected final Integer limit;
+    protected final Expression where;
+    protected final OrderBy orderBy;
+
+    public ClientProcessingPlan(StatementContext context, FilterableStatement 
statement, TableRef table, 
+            RowProjector projector, Integer limit, Expression where, OrderBy 
orderBy, QueryPlan delegate) {
+        super(delegate);
+        this.context = context;
+        this.statement = statement;
+        this.table = table;
+        this.projector = projector;
+        this.limit = limit;
+        this.where = where;
+        this.orderBy = orderBy;
+    }
+    
+    @Override
+    public StatementContext getContext() {
+        return context;
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return table;
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return projector;
+    }
+
+    @Override
+    public Integer getLimit() {
+        return limit;
+    }
+
+    @Override
+    public OrderBy getOrderBy() {
+        return orderBy;
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
new file mode 100644
index 0000000..01fbd11
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
+
+public class ClientScanPlan extends ClientProcessingPlan {
+
+    public ClientScanPlan(StatementContext context,
+            FilterableStatement statement, TableRef table,
+            RowProjector projector, Integer limit, Expression where,
+            OrderBy orderBy, QueryPlan delegate) {
+        super(context, statement, table, projector, limit, where, orderBy,
+                delegate);
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        ResultIterator iterator = delegate.iterator();
+        if (where != null) {
+            iterator = new FilterResultIterator(iterator, where);
+        }
+        
+        if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
+            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
+                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            iterator = new OrderedResultIterator(iterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit, 
projector.getEstimatedRowByteSize());
+        } else if (limit != null) {
+            iterator = new LimitingResultIterator(iterator, limit);
+        }
+        
+        if (context.getSequenceManager().getSequenceCount() > 0) {
+            iterator = new SequenceResultIterator(iterator, 
context.getSequenceManager());
+        }
+        
+        return iterator;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> planSteps = 
Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+        if (where != null) {
+            planSteps.add("CLIENT FILTER BY " + where.toString());
+        }
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " 
ROW"  + (limit == 1 ? "" : "S"))  + " SORTED BY " + 
orderBy.getOrderByExpressions().toString());
+        } else if (limit != null) {
+            planSteps.add("CLIENT " + limit + " ROW LIMIT");
+        }
+        if (context.getSequenceManager().getSequenceCount() > 0) {
+            int nSequences = context.getSequenceManager().getSequenceCount();
+            planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " 
SEQUENCE" + (nSequences == 1 ? "" : "S"));
+        }
+        
+        return new ExplainPlan(planSteps);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index fce4245..c6ed0ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -51,6 +51,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -76,7 +77,7 @@ import com.google.common.collect.Lists;
 public class HashJoinPlan extends DelegateQueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
 
-    private final FilterableStatement statement;
+    private final SelectStatement statement;
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
@@ -88,14 +89,13 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
     
-    public static HashJoinPlan create(FilterableStatement statement, 
+    public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
-        if (plan instanceof BaseQueryPlan)
+        if (!(plan instanceof HashJoinPlan))
             return new HashJoinPlan(statement, plan, joinInfo, subPlans, 
joinInfo == null);
         
-        assert (plan instanceof HashJoinPlan);
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
-        assert hashJoinPlan.joinInfo == null;
+        assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate 
instanceof BaseQueryPlan);
         SubPlan[] mergedSubPlans = new SubPlan[hashJoinPlan.subPlans.length + 
subPlans.length];
         int i = 0;
         for (SubPlan subPlan : hashJoinPlan.subPlans) {
@@ -107,7 +107,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, 
mergedSubPlans, true);
     }
     
-    private HashJoinPlan(FilterableStatement statement, 
+    private HashJoinPlan(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean 
recompileWhereClause) {
         super(plan);
         this.statement = statement;
@@ -170,6 +170,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             throw firstException;
         }
         
+        Expression postFilter = null;
         boolean hasKeyRangeExpressions = keyRangeExpressions != null && 
!keyRangeExpressions.isEmpty();
         if (recompileWhereClause || hasKeyRangeExpressions) {
             StatementContext context = delegate.getContext();
@@ -177,10 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
             ParseNode viewWhere = table.getViewStatement() == null ? null : 
new SQLParser(table.getViewStatement()).parseQuery().getWhere();
             
context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) 
(delegate.getStatement()), delegate.getContext().getConnection()));
             if (recompileWhereClause) {
-                WhereCompiler.compile(delegate.getContext(), 
delegate.getStatement(), viewWhere);                
+                postFilter = WhereCompiler.compile(delegate.getContext(), 
delegate.getStatement(), viewWhere, null);
             }
             if (hasKeyRangeExpressions) {
-                WhereCompiler.compile(delegate.getContext(), 
delegate.getStatement(), viewWhere, keyRangeExpressions, true);
+                WhereCompiler.compile(delegate.getContext(), 
delegate.getStatement(), viewWhere, keyRangeExpressions, true, null);
             }
         }
 
@@ -189,7 +190,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        return ((BaseQueryPlan) delegate).iterator(dependencies);
+        ResultIterator iterator = joinInfo == null ? delegate.iterator() : 
((BaseQueryPlan) delegate).iterator(dependencies);
+        if (statement.getInnerSelectStatement() != null && postFilter != null) 
{
+            iterator = new FilterResultIterator(iterator, postFilter);
+        }
+        
+        return iterator;
     }
 
     private Expression createKeyRangeExpression(Expression lhsExpression,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index 410d386..c9cbd15 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -20,14 +20,12 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.join.TupleProjector;
-import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 import com.google.common.collect.Lists;
@@ -49,52 +47,33 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
         if (postFilter != null) {
             planSteps.add("CLIENT FILTER BY " + postFilter.toString());
         }
-        
+
         return new ExplainPlan(planSteps);
     }
 
     @Override
     public ResultIterator iterator() throws SQLException {
-        final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
-
-        return new DelegateResultIterator(delegate.iterator()) {
+        ResultIterator iterator = new 
DelegateResultIterator(delegate.iterator()) {
             
             @Override
             public Tuple next() throws SQLException {
-                Tuple tuple = null;
-                while (tuple == null) {
-                    tuple = super.next();
-                    if (tuple == null) {
-                        break;
-                    }
-                    
-                    tuple = tupleProjector.projectResults(tuple);
-                    
-                    if (postFilter != null) {
-                        postFilter.reset();
-                        try {
-                            if (postFilter.evaluate(tuple, tempPtr)) {
-                                Boolean b = 
(Boolean)postFilter.getDataType().toObject(tempPtr);
-                                if (!b.booleanValue()) {
-                                    tuple = null;
-                                }            
-                            } else {
-                                tuple = null;
-                            }
-                        } catch (IllegalDataException e) {
-                            tuple = null;
-                        }
-                    }
-                }
+                Tuple tuple = super.next();
+                if (tuple == null)
+                    return null;
                 
-                return tuple;
+                return tupleProjector.projectResults(tuple);
             }
 
             @Override
             public String toString() {
-                return "TupleProjectionResultIterator [projector=" + 
tupleProjector + ", postFilter="
-                        + postFilter + "]";
+                return "TupleProjectionResultIterator [projector=" + 
tupleProjector + "]";
             }            
         };
+        
+        if (postFilter != null) {
+            iterator = new FilterResultIterator(iterator, postFilter);
+        }
+        
+        return iterator;
     }
 }

Reply via email to