PHOENIX-2722 support mysql offset clause

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f65d0481
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f65d0481
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f65d0481

Branch: refs/heads/4.x-HBase-0.98
Commit: f65d0481d00e2fa70b03d97345377cb26d7fcb22
Parents: 4c588f4
Author: Ankit Singhal <[email protected]>
Authored: Fri Apr 8 16:06:50 2016 +0530
Committer: Ankit Singhal <[email protected]>
Committed: Fri Apr 8 16:06:50 2016 +0530

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AutoCommitIT.java    |  14 +-
 .../apache/phoenix/end2end/CreateTableIT.java   |   2 +-
 .../apache/phoenix/end2end/DerivedTableIT.java  | 144 ++++++++++++-
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 161 ++++++++++++++
 .../phoenix/end2end/QueryWithOffsetIT.java      | 211 +++++++++++++++++++
 .../org/apache/phoenix/end2end/ReadOnlyIT.java  |  12 +-
 .../apache/phoenix/end2end/SortMergeJoinIT.java |  44 ++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  16 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  13 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  10 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   5 +
 .../apache/phoenix/compile/OffsetCompiler.java  | 114 ++++++++++
 .../apache/phoenix/compile/OrderByCompiler.java |   3 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   3 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  52 +++--
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../phoenix/compile/StatementNormalizer.java    |   2 +-
 .../phoenix/compile/SubqueryRewriter.java       |  10 +-
 .../phoenix/compile/SubselectRewriter.java      |  15 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../phoenix/coprocessor/ScanRegionObserver.java |  95 ++++++++-
 .../apache/phoenix/execute/AggregatePlan.java   |  43 ++--
 .../apache/phoenix/execute/BaseQueryPlan.java   |   9 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  19 +-
 .../phoenix/execute/ClientProcessingPlan.java   |  10 +-
 .../apache/phoenix/execute/ClientScanPlan.java  |  38 ++--
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../phoenix/execute/DelegateQueryPlan.java      |   4 +
 .../execute/LiteralResultIterationPlan.java     |  13 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  58 +++--
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  15 +-
 .../phoenix/iterate/BaseResultIterators.java    |  14 +-
 .../apache/phoenix/iterate/ExplainTable.java    |  16 +-
 .../phoenix/iterate/LimitingResultIterator.java |   2 +-
 .../iterate/MergeSortTopNResultIterator.java    |  21 +-
 .../phoenix/iterate/OffsetResultIterator.java   |  62 ++++++
 .../OrderedAggregatingResultIterator.java       |   6 +-
 .../phoenix/iterate/OrderedResultIterator.java  |  58 +++--
 .../phoenix/iterate/ParallelIterators.java      |   4 +-
 .../apache/phoenix/iterate/SerialIterators.java |  19 +-
 .../phoenix/iterate/TableResultIterator.java    |  22 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  18 +-
 .../org/apache/phoenix/join/HashJoinInfo.java   |   2 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../apache/phoenix/parse/DeleteStatement.java   |   6 +
 .../phoenix/parse/FilterableStatement.java      |   1 +
 .../org/apache/phoenix/parse/OffsetNode.java    |  67 ++++++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  55 +++--
 .../apache/phoenix/parse/ParseNodeRewriter.java |   2 +-
 .../apache/phoenix/parse/SelectStatement.java   |  35 ++-
 .../apache/phoenix/query/QueryConstants.java    |   6 +
 .../java/org/apache/phoenix/util/QueryUtil.java |  30 ++-
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +
 .../phoenix/execute/CorrelatePlanTest.java      |  39 +++-
 .../execute/LiteralResultIteratorPlanTest.java  | 192 +++++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   3 +-
 .../query/ParallelIteratorsSplitTest.java       |   5 +
 60 files changed, 1621 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
index aa92c5e..469f2de 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
@@ -42,21 +42,21 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT {
         conn.setAutoCommit(true);
         
         String ddl = "CREATE TABLE test_table " +
-                "  (row varchar not null, col1 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (row))\n";
+                "  (r varchar not null, col1 integer" +
+                "  CONSTRAINT pk PRIMARY KEY (r))\n";
         createTestTable(getUrl(), ddl);
         
-        String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 1)";
+        String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 1)";
         PreparedStatement statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
         
         conn.setAutoCommit(false);
-        query = "UPSERT INTO test_table(row, col1) VALUES('row1', 2)";
+        query = "UPSERT INTO test_table(r, col1) VALUES('row1', 2)";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         
-        query = "DELETE FROM test_table WHERE row='row1'";
+        query = "DELETE FROM test_table WHERE r='row1'";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
@@ -66,11 +66,11 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT {
         ResultSet rs = statement.executeQuery();
         assertFalse(rs.next());
 
-        query = "DELETE FROM test_table WHERE row='row1'";
+        query = "DELETE FROM test_table WHERE r='row1'";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
 
-        query = "UPSERT INTO test_table(row, col1) VALUES('row1', 3)";
+        query = "UPSERT INTO test_table(r, col1) VALUES('row1', 3)";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 5ffc354..b0370e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -74,7 +74,7 @@ public class CreateTableIT extends BaseClientManagedTimeIT {
                 "                data.file VARCHAR ,\n" + 
                 "                data.fk_log VARCHAR ,\n" + 
                 "                data.host VARCHAR ,\n" + 
-                "                data.row VARCHAR ,\n" + 
+                "                data.r VARCHAR ,\n" + 
                 "                data.size VARCHAR ,\n" + 
                 "                data.start_time VARCHAR ,\n" + 
                 "                data.stat_date DATE ,\n" + 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
index 1a772c8..9b37f9e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
@@ -232,6 +232,46 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             assertEquals(9,rs.getInt(1));
 
             assertFalse(rs.next());
+
+            // Inner limit < outer query offset
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, 
b_string b, a_byte + 1 x FROM aTable LIMIT 1 OFFSET 1 ) AS t WHERE t.b = '"
+                    + C_VALUE + "' OFFSET 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertFalse(rs.next());
+
+            // (where) offset
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, 
b_string b, a_byte + 1 x FROM aTable WHERE a_byte + 1 < 9 ) AS t OFFSET 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+            assertEquals(13, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW4, rs.getString(1));
+            assertEquals(14, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertEquals(15, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW6, rs.getString(1));
+            assertEquals(16, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW7, rs.getString(1));
+            assertEquals(17, rs.getInt(2));
+
+            // (offset) where
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, 
b_string b, a_byte + 1 x FROM aTable OFFSET 4) AS t WHERE t.b = '"
+                    + C_VALUE + "'";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertEquals(15, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW8, rs.getString(1));
+            assertEquals(18, rs.getInt(2));
+
         } finally {
             conn.close();
         }
@@ -349,6 +389,17 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             assertEquals(1,rs.getInt(2));
 
             assertFalse(rs.next());
+
+            // (groupby) groupby orderby offset
+            query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable 
GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC OFFSET 1";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+
+            assertFalse(rs.next());
+
         } finally {
             conn.close();
         }
@@ -544,7 +595,98 @@ public class DerivedTableIT extends 
BaseClientManagedTimeIT {
             conn.close();
         }
     }
-    
+
+    @Test
+    public void testDerivedTableWithOffset() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            // (LIMIT OFFSET )
+            String query = "SELECT t.eid FROM (SELECT entity_id eid FROM 
aTable LIMIT 2 OFFSET 1) AS t";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (OFFSET) limit
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable 
OFFSET 1) AS t LIMIT 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (limit OFFSET) limit OFFSET
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 
2 OFFSET 1) AS t LIMIT 4 OFFSET 1";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+            assertFalse(rs.next());
+
+            // (limit OFFSET) limit 2
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 
4 OFFSET 1) AS t LIMIT 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (limit ? OFFSET ?) limit ? OFFSET ?
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 
? OFFSET ?) AS t LIMIT ? OFFSET ?";
+            statement = conn.prepareStatement(query);
+            statement.setInt(1, 4);
+            statement.setInt(2, 2);
+            statement.setInt(3, 2);
+            statement.setInt(4, 2);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW6, rs.getString(1));
+            assertFalse(rs.next());
+
+            // (groupby orderby OFFSET)
+            query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM 
aTable GROUP BY a_string ORDER BY sum(a_byte) OFFSET 1)";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(A_VALUE, rs.getString(1));
+            assertEquals(10, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(B_VALUE, rs.getString(1));
+            assertEquals(26, rs.getInt(2));
+
+            assertFalse(rs.next());
+
+            // (union OFFSET) groupby 
+            query = "SELECT a_string, count(*) FROM (SELECT a_string FROM 
aTable where a_byte < 4 union all SELECT a_string FROM aTable where a_byte > 8 
OFFSET 1) group by a_string";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertEquals(2,rs.getInt(2));
+            assertTrue (rs.next());
+            assertEquals(C_VALUE,rs.getString(1));
+            assertEquals(1,rs.getInt(2));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
     @Test
     public void testDerivedTableWithDistinct() throws Exception {
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 00655cf..3e2356f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -441,6 +441,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "    DYNAMIC SERVER FILTER BY \"I.item_id\" IN 
(\"O.item_id\")\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = 
s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id 
LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 
OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN 
(\"I.supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -770,6 +802,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    PARALLEL INNER-JOIN TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + 
".OrderTable\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = 
s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id 
LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + 
".idx_item\n" +
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 
OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + 
".idx_item\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN 
(\"I.0:supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -1124,6 +1188,40 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "    DYNAMIC SERVER FILTER BY \"I.:item_id\" IN 
(\"O.item_id\")\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = 
s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id 
LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ 
MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " 
[-32768]\n" +
+                "        CLIENT MERGE SORT\n" +      
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 
OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ 
MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " 
[-32768]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN 
(\"I.0:supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         return testCases;
     }
@@ -2920,6 +3018,26 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             assertEquals(rs.getString(4), "S5");
 
             assertFalse(rs.next());
+
+            query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", 
supp.name FROM "
+                    + JOIN_ITEM_TABLE_FULL_NAME
+                    + " item INNER JOIN (SELECT reverse(loc_id), 
\"supplier_id\", name FROM "
+                    + JOIN_SUPPLIER_TABLE_FULL_NAME
+                    + " ORDER BY \"supplier_id\"  OFFSET 2) AS supp ON 
item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "0000000006");
+            assertEquals(rs.getString(4), "S6");
+            assertFalse(rs.next());
+
         } finally {
             conn.close();
         }
@@ -3234,6 +3352,49 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
     }
 
     @Test
+    public void testJoinWithOffset() throws Exception {
+        String query1 = "SELECT \"order_id\", i.name, s.name, s.address, 
quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME
+                + " s LEFT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON 
i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = 
i.\"item_id\" LIMIT 1 OFFSET 2 ";
+        String query2 = "SELECT \"order_id\", i.name, s.name, s.address, 
quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME
+                + " s JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON 
i.\"supplier_id\" = s.\"supplier_id\" JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = 
i.\"item_id\" LIMIT 1 OFFSET 2 ";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query1);
+            assertEquals(plans[22], QueryUtil.getExplainPlan(rs));
+
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+            assertEquals(plans[23], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
     public void testNonEquiJoin() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
new file mode 100644
index 0000000..c609581
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT {
+    
+    private String tableName;
+    private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", 
"i", "j", "k", "l", "m", "n", "o", "p",
+            "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    private final String ddl;
+
+    public QueryWithOffsetIT(String preSplit) {
+        this.tableName = tableName + "_" + preSplit.charAt(2);
+        this.ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" 
+ "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 
VARCHAR,\n"
+                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + preSplit;
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
Boolean.toString(true));
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Parameters(name="preSplit = {0}")
+    public static Collection<String> data() {
+        return Arrays.asList(new String[] { " SPLIT ON ('e','i','o')", " 
SALT_BUCKETS=10" });
+    }
+
+    @Test
+    public void testLimitOffset() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        int limit = 10;
+        int offset = 10;
+        updateStatistics(conn);
+        ResultSet rs;
+        rs = conn.createStatement()
+                .executeQuery("SELECT t_id from " + tableName + " order by 
t_id limit " + limit + " offset " + offset);
+        int i = 0;
+        while (i++ < limit) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+
+        limit = 35;
+        rs = conn.createStatement().executeQuery("SELECT t_id from " + 
tableName + " union all SELECT t_id from "
+                + tableName + " offset " + offset + " FETCH FIRST " + limit + 
" rows only");
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        i = 0;
+        while (i++ < limit - strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    @Test
+    public void testOffsetSerialQueryExecutedOnServer() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        int offset = 10;
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        String query = "SELECT t_id from " + tableName + " offset " + offset;
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        rs.next();
+        rs.next();
+        rs.next();
+        assertEquals("    SERVER OFFSET " + offset, rs.getString(1));
+        rs = conn.createStatement().executeQuery(query);
+        int i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    @Test
+    public void testOffsetWithoutLimit() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        int offset = 10;
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        ResultSet rs;
+        rs = conn.createStatement()
+                .executeQuery("SELECT t_id from " + tableName + " order by 
t_id offset " + offset + " row");
+        int i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+
+        rs = conn.createStatement().executeQuery(
+                "SELECT k3, count(*) from " + tableName + " group by k3 order 
by k3 desc offset " + offset + " row");
+
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings.length - offset - i + 2, rs.getInt(1));
+        }
+
+        rs = conn.createStatement().executeQuery("SELECT t_id from " + 
tableName + " union all SELECT t_id from "
+                + tableName + " offset " + offset + " rows");
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        i = 0;
+        while (i++ < strings.length) {
+            assertTrue(rs.next());
+            assertEquals(strings[i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " 
values('" + strings[i] + "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+
+    private void updateStatistics(Connection conn) throws SQLException {
+        String query = "UPDATE STATISTICS " + tableName + " SET \"" + 
QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
+                + "\"=" + Long.toString(500);
+        conn.createStatement().execute(query);
+    }
+
+    @Test
+    public void testMetaDataWithOffset() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        PreparedStatement stmt = conn.prepareStatement("SELECT * from " + 
tableName + " offset ?");
+        ParameterMetaData pmd = stmt.getParameterMetaData();
+        assertEquals(1, pmd.getParameterCount());
+        assertEquals(Types.INTEGER, pmd.getParameterType(1));
+        stmt.setInt(1, 10);
+        ResultSet rs = stmt.executeQuery();
+        ResultSetMetaData md = rs.getMetaData();
+        assertEquals(5, md.getColumnCount());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
index fbebe08..bcc4ee8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
@@ -40,11 +40,11 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String ddl = "CREATE TABLE test_table " +
-                        "  (row varchar not null, col1 integer" +
-                        "  CONSTRAINT pk PRIMARY KEY (row))\n"; 
+                        "  (r varchar not null, col1 integer" +
+                        "  CONSTRAINT pk PRIMARY KEY (r))\n"; 
         createTestTable(getUrl(), ddl);
 
-        String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 777)";
+        String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 777)";
         PreparedStatement statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
@@ -53,8 +53,8 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
                conn.setReadOnly(true);
                 assertTrue(conn.isReadOnly());
                ddl = "CREATE TABLE test_table2 " +
-                               "  (row varchar not null, col1 integer" +
-                               "  CONSTRAINT pk PRIMARY KEY (row))\n";
+                               "  (r varchar not null, col1 integer" +
+                               "  CONSTRAINT pk PRIMARY KEY (r))\n";
                statement = conn.prepareStatement(ddl);
                statement.executeUpdate();
                conn.commit();
@@ -64,7 +64,7 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
         }
          
        try {  
-                query = "UPSERT INTO test_table(row, col1) VALUES('row1', 
888)";
+                query = "UPSERT INTO test_table(r, col1) VALUES('row1', 888)";
                 statement = conn.prepareStatement(query);
                 statement.executeUpdate();
                 conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
index f19b886..43afd0d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -2383,6 +2383,50 @@ public class SortMergeJoinIT extends 
BaseHBaseManagedTimeIT {
     }
 
     @Test
+    public void testJoinWithOffset() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", 
i.name, s.name, s.address, quantity FROM "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s LEFT JOIN " + 
JOIN_ITEM_TABLE_FULL_NAME
+                + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + 
JOIN_ORDER_TABLE_FULL_NAME
+                + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 2 OFFSET 1";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", 
i.name, s.name, s.address, quantity FROM "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s JOIN " + 
JOIN_ITEM_TABLE_FULL_NAME
+                + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN " + 
JOIN_ORDER_TABLE_FULL_NAME
+                + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getString(4), "404 YYY Street");
+            assertEquals(rs.getInt(5), 0);
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 1000);
+            assertFalse(rs.next());
+
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
     public void testNonEquiJoin() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 834a410..a042090 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -125,6 +125,11 @@ tokens
     LIST = 'list';
     JARS='jars';
     ROW_TIMESTAMP='row_timestamp';
+    OFFSET ='offset';
+    FETCH = 'fetch';
+    ROW = 'row';
+    ROWS = 'rows';
+    ONLY = 'only';
 }
 
 
@@ -650,7 +655,7 @@ single_select returns [SelectStatement ret]
         (WHERE where=expression)?
         (GROUP BY group=group_by)?
         (HAVING having=expression)?
-        { ParseContext context = contextStack.peek(); $ret = 
factory.select(from, h, d!=null, sel, where, group, having, null, null, 
getBindCount(), context.isAggregate(), context.hasSequences(), null, new 
HashMap<String,UDFParseNode>(udfParseNodes)); }
+        { ParseContext context = contextStack.peek(); $ret = 
factory.select(from, h, d!=null, sel, where, group, having, null, null,null, 
getBindCount(), context.isAggregate(), context.hasSequences(), null, new 
HashMap<String,UDFParseNode>(udfParseNodes)); }
     ;
 finally{ contextStack.pop(); }
 
@@ -665,7 +670,9 @@ select_node returns [SelectStatement ret]
     :   u=unioned_selects
         (ORDER BY order=order_by)?
         (LIMIT l=limit)?
-        { ParseContext context = contextStack.peek(); $ret = factory.select(u, 
order, l, getBindCount(), context.isAggregate()); }
+        (OFFSET o=offset (ROW | ROWS)?)?
+        (FETCH (FIRST | NEXT) (l=limit)? (ROW | ROWS) ONLY)?
+        { ParseContext context = contextStack.peek(); $ret = factory.select(u, 
order, l, o, getBindCount(), context.isAggregate()); }
     ;
 finally{ contextStack.pop(); }
 
@@ -697,6 +704,11 @@ limit returns [LimitNode ret]
     | l=int_or_long_literal { $ret = factory.limit(l); }
     ;
     
+offset returns [OffsetNode ret]
+       : b=bind_expression { $ret = factory.offset(b); }
+    | l=int_or_long_literal { $ret = factory.offset(l); }
+    ;
+
 sampling_rate returns [LiteralParseNode ret]
     : l=literal { $ret = l; }
     ;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 24a2add..2a97686 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -352,12 +352,10 @@ public class DeleteCompiler {
                     PColumn column = table.getPKColumns().get(i);
                     aliasedNodes.add(FACTORY.aliasedNode(null, 
FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
                 }
-                select = FACTORY.select(
-                        delete.getTable(), 
-                        hint, false, aliasedNodes, delete.getWhere(), 
-                        Collections.<ParseNode>emptyList(), null, 
-                        delete.getOrderBy(), delete.getLimit(),
-                        delete.getBindCount(), false, false, 
Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
+                select = FACTORY.select(delete.getTable(), hint, false, 
aliasedNodes, delete.getWhere(),
+                        Collections.<ParseNode> emptyList(), null, 
delete.getOrderBy(), delete.getLimit(), null,
+                        delete.getBindCount(), false, false, 
Collections.<SelectStatement> emptyList(),
+                        delete.getUdfParseNodes());
                 select = StatementNormalizer.normalize(select, resolverToBe);
                 SelectStatement transformedSelect = 
SubqueryRewriter.transform(select, resolverToBe, connection);
                 if (transformedSelect != select) {
@@ -514,7 +512,8 @@ public class DeleteCompiler {
                     projectorToBe = new RowProjector(projectorToBe,true);
                 }
                 final RowProjector projector = projectorToBe;
-                final QueryPlan aggPlan = new AggregatePlan(context, select, 
tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
+                final QueryPlan aggPlan = new AggregatePlan(context, select, 
tableRef, projector, null, null,
+                        OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, 
null);
                 mutationPlans.add(new MutationPlan() {
                     @Override
                     public ParameterMetaData getParameterMetaData() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 5d03f57..e6c5970 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -689,7 +689,9 @@ public class JoinCompiler {
             if (isSubselect())
                 return 
SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, 
preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
 
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, 
selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, 
select.hasSequence(), Collections.<SelectStatement>emptyList(), 
select.getUdfParseNodes());
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, 
selectNodes, getPreFiltersCombined(), null,
+                    null, orderBy, null, null, 0, false, select.hasSequence(),
+                    Collections.<SelectStatement> emptyList(), 
select.getUdfParseNodes());
         }
 
         public boolean hasFilters() {
@@ -1071,7 +1073,8 @@ public class JoinCompiler {
                 && !select.isAggregate()
                 && !select.isDistinct()
                 && !(select.getFrom() instanceof DerivedTableNode)
-                && select.getLimit() == null;
+                && select.getLimit() == null
+                && select.getOffset() == null;
     }
 
     private static ParseNode combine(List<ParseNode> nodes) {
@@ -1268,7 +1271,8 @@ public class JoinCompiler {
         String tableAlias = tableRef.getTableAlias();
         TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : 
'"' + tableAlias + '"', tName, dynamicCols);
 
-        return NODE_FACTORY.select(from, hintNode, false, selectList, where, 
groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, 
Collections.<SelectStatement>emptyList(), udfParseNodes);
+        return NODE_FACTORY.select(from, hintNode, false, selectList, where, 
groupBy, null, orderBy, null, null, 0,
+                groupBy != null, hasSequence, Collections.<SelectStatement> 
emptyList(), udfParseNodes);
     }
 
     public static PTable joinProjectedTables(PTable left, PTable right, 
JoinType type) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 7f3277a..1047334 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -194,6 +194,11 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return null;
+    }
+
+    @Override
     public OrderBy getOrderBy() {
         return OrderBy.EMPTY_ORDER_BY;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
new file mode 100644
index 0000000..54be50b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.TraverseNoParseNodeVisitor;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+
+public class OffsetCompiler {
+    private static final ParseNodeFactory NODE_FACTORY = new 
ParseNodeFactory();
+
+    public static final PDatum OFFSET_DATUM = new PDatum() {
+        @Override
+        public boolean isNullable() {
+            return false;
+        }
+
+        @Override
+        public PDataType getDataType() {
+            return PInteger.INSTANCE;
+        }
+
+        @Override
+        public Integer getMaxLength() {
+            return null;
+        }
+
+        @Override
+        public Integer getScale() {
+            return null;
+        }
+
+        @Override
+        public SortOrder getSortOrder() {
+            return SortOrder.getDefault();
+        }
+    };
+
+    private OffsetCompiler() {}
+
+    public static Integer compile(StatementContext context, 
FilterableStatement statement) throws SQLException {
+        OffsetNode offsetNode = statement.getOffset();
+        if (offsetNode == null) { return null; }
+        OffsetParseNodeVisitor visitor = new OffsetParseNodeVisitor(context);
+        offsetNode.getOffsetParseNode().accept(visitor);
+        return visitor.getOffset();
+    }
+
+    private static class OffsetParseNodeVisitor extends 
TraverseNoParseNodeVisitor<Void> {
+        private final StatementContext context;
+        private Integer offset;
+
+        public OffsetParseNodeVisitor(StatementContext context) {
+            this.context = context;
+        }
+
+        public Integer getOffset() {
+            return offset;
+        }
+
+        @Override
+        public Void visit(LiteralParseNode node) throws SQLException {
+            Object offsetValue = node.getValue();
+            if (offsetValue != null) {
+                Integer offset = 
(Integer)OFFSET_DATUM.getDataType().toObject(offsetValue, node.getType());
+                if (offset.intValue() >= 0) {
+                    this.offset = offset;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public Void visit(BindParseNode node) throws SQLException {
+            // This is for static evaluation in SubselectRewriter.
+            if (context == null) return null;
+
+            Object value = context.getBindManager().getBindValue(node);
+            context.getBindManager().addParamMetaData(node, OFFSET_DATUM);
+            // Resolve the bind value, create a LiteralParseNode, and call the
+            // visit method for it.
+            // In this way, we can deal with just having a literal on one side
+            // of the expression.
+            visit(NODE_FACTORY.literal(value, OFFSET_DATUM.getDataType()));
+            return null;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 0ae31f0..91fa5c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -82,7 +82,8 @@ public class OrderByCompiler {
      */
     public static OrderBy compile(StatementContext context,
                                   SelectStatement statement,
-                                  GroupBy groupBy, Integer limit, 
+                                  GroupBy groupBy, Integer limit,
+                                  Integer offset,
                                   RowProjector rowProjector,
                                   TupleProjector tupleProjector,
                                   boolean isInRowKeyOrder) throws SQLException 
{

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 2659b3f..752e1a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -254,7 +254,8 @@ public class PostDDLCompiler {
                             } catch (AmbiguousColumnException e) {
                                 continue;
                             }
-                            QueryPlan plan = new AggregatePlan(context, 
select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
+                            QueryPlan plan = new AggregatePlan(context, 
select, tableRef, projector, null, null,
+                                    OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
                             try {
                                 ResultIterator iterator = plan.iterator();
                                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8a86d67..82e2e92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -72,6 +72,7 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
@@ -168,7 +169,11 @@ public class QueryCompiler {
             SelectStatement subSelect = unionAllSelects.get(i);
             // Push down order-by and limit into sub-selects.
             if (!select.getOrderBy().isEmpty() || select.getLimit() != null) {
-                subSelect = NODE_FACTORY.select(subSelect, 
select.getOrderBy(), select.getLimit());
+                if (select.getOffset() == null) {
+                    subSelect = NODE_FACTORY.select(subSelect, 
select.getOrderBy(), select.getLimit(), null);
+                } else {
+                    subSelect = NODE_FACTORY.select(subSelect, 
select.getOrderBy(), null, null);
+                }
             }
             QueryPlan subPlan = compileSubquery(subSelect, true);
             TupleProjector projector = new 
TupleProjector(subPlan.getProjector());
@@ -182,8 +187,8 @@ public class QueryCompiler {
         StatementContext context = new StatementContext(statement, resolver, 
scan, sequenceManager);
 
         QueryPlan plan = compileSingleFlatQuery(context, select, 
statement.getParameters(), false, false, null, null, false);
-        plan =  new UnionPlan(context, select, tableRef, plan.getProjector(), 
plan.getLimit(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, 
-                plans, context.getBindManager().getParameterMetaData()); 
+        plan = new UnionPlan(context, select, tableRef, plan.getProjector(), 
plan.getLimit(), plan.getOffset(),
+                plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans, 
context.getBindManager().getParameterMetaData());
         return plan;
     }
 
@@ -324,10 +329,13 @@ public class QueryCompiler {
             QueryPlan plan = compileSingleFlatQuery(context, query, binds, 
asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, 
!table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
             Expression postJoinFilterExpression = 
joinTable.compilePostFilterExpression(context, table);
             Integer limit = null;
+            Integer offset = null;
             if (!query.isAggregate() && !query.isDistinct() && 
query.getOrderBy().isEmpty()) {
                 limit = plan.getLimit();
+                offset = plan.getOffset();
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, 
joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, 
postJoinFilterExpression, limit);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, 
joinExpressions, joinTypes,
+                    starJoinVector, tables, fieldPositions, 
postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
             return HashJoinPlan.create(joinTable.getStatement(), plan, 
joinInfo, hashPlans);
         }
         
@@ -378,10 +386,14 @@ public class QueryCompiler {
             QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, 
asSubquery, !asSubquery && type == JoinType.Right, null, 
!rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
             Expression postJoinFilterExpression = 
joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
+            Integer offset = null;
             if (!rhs.isAggregate() && !rhs.isDistinct() && 
rhs.getOrderBy().isEmpty()) {
                 limit = rhsPlan.getLimit();
+                offset = rhsPlan.getOffset();
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, 
new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? 
JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] 
{fieldPosition}, postJoinFilterExpression, limit);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, 
new List[] { joinExpressions },
+                    new JoinType[] { type == JoinType.Right ? JoinType.Left : 
type }, new boolean[] { true },
+                    new PTable[] { lhsTable }, new int[] { fieldPosition }, 
postJoinFilterExpression,  QueryUtil.getOffsetLimit(limit, offset));
             Pair<Expression, Expression> keyRangeExpressions = new 
Pair<Expression, Expression>(null, null);
             getKeyExpressionCombinations(keyRangeExpressions, context, 
joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
             return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, 
joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, 
false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
@@ -432,7 +444,11 @@ public class QueryCompiler {
         context.setResolver(resolver);
         TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), 
NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), 
tableRef.getTable().getTableName().getString()));
         ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, 
joinTable.getStatement().getHint(), false, Collections.<AliasedNode> 
emptyList(), where, null, null, orderBy, null, 0, false, 
joinTable.getStatement().hasSequence(), 
Collections.<SelectStatement>emptyList(), 
joinTable.getStatement().getUdfParseNodes())
+        SelectStatement select = asSubquery
+                ? NODE_FACTORY.select(from, 
joinTable.getStatement().getHint(), false,
+                        Collections.<AliasedNode> emptyList(), where, null, 
null, orderBy, null, null, 0, false,
+                        joinTable.getStatement().hasSequence(), 
Collections.<SelectStatement> emptyList(),
+                        joinTable.getStatement().getUdfParseNodes())
                 : NODE_FACTORY.select(joinTable.getStatement(), from, where);
         
         return compileSingleFlatQuery(context, select, binds, asSubquery, 
false, innerPlan, null, isInRowKeyOrder);
@@ -529,6 +545,7 @@ public class QueryCompiler {
             viewWhere = new 
SQLParser(table.getViewStatement()).parseQuery().getWhere();
         }
         Integer limit = LimitCompiler.compile(context, select);
+        Integer offset = OffsetCompiler.compile(context, select);
 
         GroupBy groupBy = GroupByCompiler.compile(context, select, 
isInRowKeyOrder);
         // Optimize the HAVING clause by finding any group by expressions that 
can be moved
@@ -547,7 +564,8 @@ public class QueryCompiler {
         groupBy = groupBy.compile(context, innerPlanTupleProjector);
         context.setResolver(resolver); // recover resolver
         RowProjector projector = ProjectionCompiler.compile(context, select, 
groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
-        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, 
limit, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : 
null, isInRowKeyOrder); 
+        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, 
limit, offset, projector,
+                groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : 
null, isInRowKeyOrder);
         // Final step is to build the query plan
         if (!asSubquery) {
             int maxRows = statement.getMaxRows();
@@ -567,11 +585,14 @@ public class QueryCompiler {
         QueryPlan plan = innerPlan;
         if (plan == null) {
             ParallelIteratorFactory parallelIteratorFactory = asSubquery ? 
null : this.parallelIteratorFactory;
-            plan = select.getFrom() == null ?
-                      new LiteralResultIterationPlan(context, select, 
tableRef, projector, limit, orderBy, parallelIteratorFactory)
-                    : (select.isAggregate() || select.isDistinct() ? 
-                              new AggregatePlan(context, select, tableRef, 
projector, limit, orderBy, parallelIteratorFactory, groupBy, having)
-                            : new ScanPlan(context, select, tableRef, 
projector, limit, orderBy, parallelIteratorFactory, allowPageFilter));
+           plan = select.getFrom() == null
+                   ? new LiteralResultIterationPlan(context, select, tableRef, 
projector, limit, offset, orderBy,
+                           parallelIteratorFactory)
+                   : (select.isAggregate() || select.isDistinct()
+                           ? new AggregatePlan(context, select, tableRef, 
projector, limit, offset, orderBy,
+                                   parallelIteratorFactory, groupBy, having)
+                           : new ScanPlan(context, select, tableRef, 
projector, limit, offset, orderBy,
+                                   parallelIteratorFactory, allowPageFilter));
         }
         if (!subqueries.isEmpty()) {
             int count = subqueries.size();
@@ -588,9 +609,10 @@ public class QueryCompiler {
             if (LiteralExpression.isTrue(where)) {
                 where = null; // we do not pass "true" as filter
             }
-            plan =  select.isAggregate() || select.isDistinct() ?
-                      new ClientAggregatePlan(context, select, tableRef, 
projector, limit, where, orderBy, groupBy, having, plan)
-                    : new ClientScanPlan(context, select, tableRef, projector, 
limit, where, orderBy, plan);
+            plan = select.isAggregate() || select.isDistinct()
+                    ? new ClientAggregatePlan(context, select, tableRef, 
projector, limit, offset, where, orderBy,
+                            groupBy, having, plan)
+                    : new ClientScanPlan(context, select, tableRef, projector, 
limit, offset, where, orderBy, plan);
 
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 1c0c469..4dcc134 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -60,6 +60,8 @@ public interface QueryPlan extends StatementPlan {
     
     Integer getLimit();
 
+    Integer getOffset();
+    
     OrderBy getOrderBy();
 
     GroupBy getGroupBy();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index 9b54c86..566afc8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -100,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
             if (selectNodes != normSelectNodes) {
                 statement = NODE_FACTORY.select(statement.getFrom(), 
statement.getHint(), statement.isDistinct(),
                         normSelectNodes, statement.getWhere(), 
statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(),
-                        statement.getLimit(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
+                        statement.getLimit(), statement.getOffset(), 
statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), 
statement.getSelects(), statement.getUdfParseNodes());
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 123cb6a..f051aa5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -341,7 +341,10 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 groupbyNodes.set(i - 1, aliasedNode.getNode());
             }
             SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, 
subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
-            subquery = 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, 
derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, 
null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), 
true, false, Collections.<SelectStatement>emptyList(), 
subquery.getUdfParseNodes());
+            subquery = 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, 
derivedTableStmt),
+                    subquery.getHint(), false, selectNodes, null, 
groupbyNodes, null,
+                    Collections.<OrderByNode> emptyList(), null, null, 
subquery.getBindCount(), true, false,
+                    Collections.<SelectStatement> emptyList(), 
subquery.getUdfParseNodes());
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -364,7 +367,10 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             return select;
         
         // Wrap as a derived table.
-        return 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(),
 select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, 
null, null, null, select.getBindCount(), false, false, 
Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
+        return 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(),
 select),
+                HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, 
null, null, null, null, null,
+                select.getBindCount(), false, false, 
Collections.<SelectStatement> emptyList(),
+                select.getUdfParseNodes());
     }
     
     private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean 
addSelectOne) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 853d772..1def3a7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.LimitNode;
+import org.apache.phoenix.parse.OffsetNode;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeRewriter;
@@ -110,6 +111,7 @@ public class SubselectRewriter extends ParseNodeRewriter {
         ParseNode havingRewrite = subselect.getHaving();
         List<OrderByNode> orderByRewrite = subselect.getOrderBy();
         LimitNode limitRewrite = subselect.getLimit();
+        OffsetNode offsetRewrite = subselect.getOffset();
         HintNode hintRewrite = subselect.getHint();
         boolean isDistinctRewrite = subselect.isDistinct();
         boolean isAggregateRewrite = subselect.isAggregate();
@@ -187,6 +189,13 @@ public class SubselectRewriter extends ParseNodeRewriter {
             }
         }
         
+        OffsetNode offset = select.getOffset();
+        if (offsetRewrite != null || (limitRewrite != null && offset != null)) 
{
+            return select;
+        } else {
+            offsetRewrite = offset;
+        }
+        
         LimitNode limit = select.getLimit();
         if (limit != null) {
             if (limitRewrite == null) {
@@ -207,8 +216,10 @@ public class SubselectRewriter extends ParseNodeRewriter {
             hintRewrite = hintRewrite == null ? hint : HintNode.combine(hint, 
hintRewrite);
         }
         
-        return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, 
isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, 
-            havingRewrite, orderByRewrite, limitRewrite, 
select.getBindCount(), isAggregateRewrite, select.hasSequence(), 
select.getSelects(), select.getUdfParseNodes());
+        return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, 
isDistinctRewrite, selectNodesRewrite,
+                whereRewrite, groupByRewrite, havingRewrite, orderByRewrite, 
limitRewrite, offsetRewrite,
+                select.getBindCount(), isAggregateRewrite, 
select.hasSequence(), select.getSelects(),
+                select.getUdfParseNodes());
     }
     
     private SelectStatement applyPostFilters(SelectStatement statement, 
List<ParseNode> postFilters) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 58cdb64..699643b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -212,6 +212,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return null;
+    }
+
+    @Override
     public OrderBy getOrderBy() {
         return OrderBy.EMPTY_ORDER_BY;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6ec7f70..7c6347f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -655,7 +655,7 @@ public class UpsertCompiler {
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, 
UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     
                     // Ignore order by - it has no impact
-                    final QueryPlan aggPlan = new AggregatePlan(context, 
select, tableRef, aggProjector, null, OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
+                    final QueryPlan aggPlan = new AggregatePlan(context, 
select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
                     return new MutationPlan() {
                         @Override
                         public ParameterMetaData getParameterMetaData() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9b440ac..96ee543 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public static final String TX_SCN = "_TxScn";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String IGNORE_NEWER_MUTATIONS = 
"_IGNORE_NEWER_MUTATIONS";
+    public final static String SCAN_OFFSET = "_RowOffset";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations 
(later). Custom annotations

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 5df5755..3333d9c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -23,19 +23,20 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import co.cask.tephra.Transaction;
-
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -47,23 +48,27 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import co.cask.tephra.Transaction;
+
 
 /**
  *
@@ -121,7 +126,8 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
                 orderByExpressions.add(orderByExpression);
             }
             ResultIterator inner = new RegionScannerResultIterator(s);
-            return new OrderedResultIterator(inner, orderByExpressions, 
thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+            return new OrderedResultIterator(inner, orderByExpressions, 
thresholdBytes, limit >= 0 ? limit : null, null,
+                    estimatedRowSize);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
@@ -183,7 +189,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             offset = region.getStartKey().length != 0 ? 
region.getStartKey().length:region.getEndKey().length;
             ScanUtil.setRowKeyOffset(scan, offset);
         }
-
+        byte[] scanOffsetBytes = 
scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
+        Integer scanOffset = null;
+        if (scanOffsetBytes != null) {
+            scanOffset = Bytes.toInt(scanOffsetBytes);
+        }
         RegionScanner innerScanner = s;
 
         Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
@@ -217,7 +227,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         if (j != null) {
             innerScanner = new HashJoinRegionScanner(innerScanner, p, j, 
tenantId, c.getEnvironment());
         }
-
+        if (scanOffset != null) {
+            innerScanner = getOffsetScanner(c, innerScanner,
+                    new OffsetResultIterator(new 
RegionScannerResultIterator(innerScanner), scanOffset),
+                    scan.getAttribute(QueryConstants.LAST_SCAN) != null);
+        }
         final OrderedResultIterator iterator = 
deserializeFromScan(scan,innerScanner);
         if (iterator == null) {
             return innerScanner;
@@ -226,6 +240,73 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         return getTopNScanner(c, innerScanner, iterator, tenantId);
     }
 
+    private RegionScanner getOffsetScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s,
+            final OffsetResultIterator iterator, final boolean isLastScan) 
throws IOException {
+        final Tuple firstTuple;
+        final HRegion region = c.getEnvironment().getRegion();
+        region.startRegionOperation();
+        try {
+            // Once we return from the first call to next, we've run through 
and
+            // cached
+            // the topN rows, so we no longer need to start/stop a region
+            // operation.
+            Tuple tuple = iterator.next();
+            if (tuple == null && !isLastScan) {
+                List<KeyValue> kvList = new ArrayList<KeyValue>(1);
+                KeyValue kv = new 
KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
+                        QueryConstants.OFFSET_COLUMN, 
Bytes.toBytes(iterator.getUnusedOffset()));
+                kvList.add(kv);
+                Result r = new Result(kvList);
+                firstTuple = new ResultTuple(r);
+            } else {
+                firstTuple = tuple;
+            }
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
+        }
+        return new BaseRegionScanner(s) {
+            private Tuple tuple = firstTuple;
+
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null;
+            }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                try {
+                    if (isFilterDone()) { return false; }
+                    for (int i = 0; i < tuple.size(); i++) {
+                        results.add(tuple.getValue(i));
+                    }
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                    return false;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    try {
+                        if (iterator != null) {
+                            iterator.close();
+                        }
+                    } catch (SQLException e) {
+                        
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
+                    }
+                }
+            }
+        };
+    }
+
     /**
      *  Return region scanner that does TopN.
      *  We only need to call startRegionOperation and closeRegionOperation when

Reply via email to