PHOENIX-2722 support mysql offset clause
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f65d0481 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f65d0481 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f65d0481 Branch: refs/heads/4.x-HBase-0.98 Commit: f65d0481d00e2fa70b03d97345377cb26d7fcb22 Parents: 4c588f4 Author: Ankit Singhal <[email protected]> Authored: Fri Apr 8 16:06:50 2016 +0530 Committer: Ankit Singhal <[email protected]> Committed: Fri Apr 8 16:06:50 2016 +0530 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AutoCommitIT.java | 14 +- .../apache/phoenix/end2end/CreateTableIT.java | 2 +- .../apache/phoenix/end2end/DerivedTableIT.java | 144 ++++++++++++- .../org/apache/phoenix/end2end/HashJoinIT.java | 161 ++++++++++++++ .../phoenix/end2end/QueryWithOffsetIT.java | 211 +++++++++++++++++++ .../org/apache/phoenix/end2end/ReadOnlyIT.java | 12 +- .../apache/phoenix/end2end/SortMergeJoinIT.java | 44 ++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 16 +- .../apache/phoenix/compile/DeleteCompiler.java | 13 +- .../apache/phoenix/compile/JoinCompiler.java | 10 +- .../phoenix/compile/ListJarsQueryPlan.java | 5 + .../apache/phoenix/compile/OffsetCompiler.java | 114 ++++++++++ .../apache/phoenix/compile/OrderByCompiler.java | 3 +- .../apache/phoenix/compile/PostDDLCompiler.java | 3 +- .../apache/phoenix/compile/QueryCompiler.java | 52 +++-- .../org/apache/phoenix/compile/QueryPlan.java | 2 + .../phoenix/compile/StatementNormalizer.java | 2 +- .../phoenix/compile/SubqueryRewriter.java | 10 +- .../phoenix/compile/SubselectRewriter.java | 15 +- .../apache/phoenix/compile/TraceQueryPlan.java | 5 + .../apache/phoenix/compile/UpsertCompiler.java | 2 +- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../phoenix/coprocessor/ScanRegionObserver.java | 95 ++++++++- .../apache/phoenix/execute/AggregatePlan.java | 43 ++-- .../apache/phoenix/execute/BaseQueryPlan.java | 9 +- .../phoenix/execute/ClientAggregatePlan.java | 19 +- .../phoenix/execute/ClientProcessingPlan.java | 10 +- .../apache/phoenix/execute/ClientScanPlan.java | 38 ++-- .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../phoenix/execute/DelegateQueryPlan.java | 4 + .../execute/LiteralResultIterationPlan.java | 13 +- .../org/apache/phoenix/execute/ScanPlan.java | 58 +++-- .../phoenix/execute/SortMergeJoinPlan.java | 5 + .../org/apache/phoenix/execute/UnionPlan.java | 15 +- .../phoenix/iterate/BaseResultIterators.java | 14 +- .../apache/phoenix/iterate/ExplainTable.java | 16 +- .../phoenix/iterate/LimitingResultIterator.java | 2 +- .../iterate/MergeSortTopNResultIterator.java | 21 +- .../phoenix/iterate/OffsetResultIterator.java | 62 ++++++ .../OrderedAggregatingResultIterator.java | 6 +- .../phoenix/iterate/OrderedResultIterator.java | 58 +++-- .../phoenix/iterate/ParallelIterators.java | 4 +- .../apache/phoenix/iterate/SerialIterators.java | 19 +- .../phoenix/iterate/TableResultIterator.java | 22 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 18 +- .../org/apache/phoenix/join/HashJoinInfo.java | 2 +- .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../apache/phoenix/parse/DeleteStatement.java | 6 + .../phoenix/parse/FilterableStatement.java | 1 + .../org/apache/phoenix/parse/OffsetNode.java | 67 ++++++ .../apache/phoenix/parse/ParseNodeFactory.java | 55 +++-- .../apache/phoenix/parse/ParseNodeRewriter.java | 2 +- .../apache/phoenix/parse/SelectStatement.java | 35 ++- .../apache/phoenix/query/QueryConstants.java | 6 + .../java/org/apache/phoenix/util/QueryUtil.java | 30 ++- .../java/org/apache/phoenix/util/ScanUtil.java | 4 + .../phoenix/execute/CorrelatePlanTest.java | 39 +++- .../execute/LiteralResultIteratorPlanTest.java | 192 +++++++++++++++++ .../phoenix/execute/UnnestArrayPlanTest.java | 3 +- .../query/ParallelIteratorsSplitTest.java | 5 + 60 files changed, 1621 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java index aa92c5e..469f2de 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java @@ -42,21 +42,21 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT { conn.setAutoCommit(true); String ddl = "CREATE TABLE test_table " + - " (row varchar not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (row))\n"; + " (r varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (r))\n"; createTestTable(getUrl(), ddl); - String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 1)"; + String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 1)"; PreparedStatement statement = conn.prepareStatement(query); statement.executeUpdate(); conn.commit(); conn.setAutoCommit(false); - query = "UPSERT INTO test_table(row, col1) VALUES('row1', 2)"; + query = "UPSERT INTO test_table(r, col1) VALUES('row1', 2)"; statement = conn.prepareStatement(query); statement.executeUpdate(); - query = "DELETE FROM test_table WHERE row='row1'"; + query = "DELETE FROM test_table WHERE r='row1'"; statement = conn.prepareStatement(query); statement.executeUpdate(); conn.commit(); @@ -66,11 +66,11 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT { ResultSet rs = statement.executeQuery(); assertFalse(rs.next()); - query = "DELETE FROM test_table WHERE row='row1'"; + query = "DELETE FROM test_table WHERE r='row1'"; statement = conn.prepareStatement(query); statement.executeUpdate(); - query = "UPSERT INTO test_table(row, col1) VALUES('row1', 3)"; + query = "UPSERT INTO test_table(r, col1) VALUES('row1', 3)"; statement = conn.prepareStatement(query); statement.executeUpdate(); conn.commit(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index 5ffc354..b0370e8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -74,7 +74,7 @@ public class CreateTableIT extends BaseClientManagedTimeIT { " data.file VARCHAR ,\n" + " data.fk_log VARCHAR ,\n" + " data.host VARCHAR ,\n" + - " data.row VARCHAR ,\n" + + " data.r VARCHAR ,\n" + " data.size VARCHAR ,\n" + " data.start_time VARCHAR ,\n" + " data.stat_date DATE ,\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java index 1a772c8..9b37f9e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java @@ -232,6 +232,46 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { assertEquals(9,rs.getInt(1)); assertFalse(rs.next()); + + // Inner limit < outer query offset + query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable LIMIT 1 OFFSET 1 ) AS t WHERE t.b = '" + + C_VALUE + "' OFFSET 2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertFalse(rs.next()); + + // (where) offset + query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable WHERE a_byte + 1 < 9 ) AS t OFFSET 2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW3, rs.getString(1)); + assertEquals(13, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(ROW4, rs.getString(1)); + assertEquals(14, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(ROW5, rs.getString(1)); + assertEquals(15, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(ROW6, rs.getString(1)); + assertEquals(16, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(ROW7, rs.getString(1)); + assertEquals(17, rs.getInt(2)); + + // (offset) where + query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable OFFSET 4) AS t WHERE t.b = '" + + C_VALUE + "'"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW5, rs.getString(1)); + assertEquals(15, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(ROW8, rs.getString(1)); + assertEquals(18, rs.getInt(2)); + } finally { conn.close(); } @@ -349,6 +389,17 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { assertEquals(1,rs.getInt(2)); assertFalse(rs.next()); + + // (groupby) groupby orderby offset + query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC OFFSET 1"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(1, rs.getInt(2)); + + assertFalse(rs.next()); + } finally { conn.close(); } @@ -544,7 +595,98 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { conn.close(); } } - + + @Test + public void testDerivedTableWithOffset() throws Exception { + long ts = nextTimestamp(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + // (LIMIT OFFSET ) + String query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 2 OFFSET 1) AS t"; + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW2, rs.getString(1)); + assertTrue(rs.next()); + assertEquals(ROW3, rs.getString(1)); + + assertFalse(rs.next()); + + // (OFFSET) limit + query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable OFFSET 1) AS t LIMIT 2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW2, rs.getString(1)); + assertTrue(rs.next()); + assertEquals(ROW3, rs.getString(1)); + + assertFalse(rs.next()); + + // (limit OFFSET) limit OFFSET + query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 2 OFFSET 1) AS t LIMIT 4 OFFSET 1"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW3, rs.getString(1)); + assertFalse(rs.next()); + + // (limit OFFSET) limit 2 + query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 4 OFFSET 1) AS t LIMIT 2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW2, rs.getString(1)); + assertTrue(rs.next()); + assertEquals(ROW3, rs.getString(1)); + + assertFalse(rs.next()); + + // (limit ? OFFSET ?) limit ? OFFSET ? + query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT ? OFFSET ?) AS t LIMIT ? OFFSET ?"; + statement = conn.prepareStatement(query); + statement.setInt(1, 4); + statement.setInt(2, 2); + statement.setInt(3, 2); + statement.setInt(4, 2); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(ROW5, rs.getString(1)); + assertTrue(rs.next()); + assertEquals(ROW6, rs.getString(1)); + assertFalse(rs.next()); + + // (groupby orderby OFFSET) + query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM aTable GROUP BY a_string ORDER BY sum(a_byte) OFFSET 1)"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(A_VALUE, rs.getString(1)); + assertEquals(10, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals(B_VALUE, rs.getString(1)); + assertEquals(26, rs.getInt(2)); + + assertFalse(rs.next()); + + // (union OFFSET) groupby + query = "SELECT a_string, count(*) FROM (SELECT a_string FROM aTable where a_byte < 4 union all SELECT a_string FROM aTable where a_byte > 8 OFFSET 1) group by a_string"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(A_VALUE,rs.getString(1)); + assertEquals(2,rs.getInt(2)); + assertTrue (rs.next()); + assertEquals(C_VALUE,rs.getString(1)); + assertEquals(1,rs.getInt(2)); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + @Test public void testDerivedTableWithDistinct() throws Exception { long ts = nextTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 00655cf..3e2356f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -441,6 +441,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + " SERVER 3 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" + + " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " JOIN-SCANNER 3 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" + + " JOIN-SCANNER 3 ROW LIMIT", }}); testCases.add(new String[][] { { @@ -770,6 +802,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { " PARALLEL INNER-JOIN TABLE 0\n" + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".OrderTable\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithLimit() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + " SERVER 3 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" + + " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " JOIN-SCANNER 3 ROW LIMIT", + /* + * testJoinWithLimit() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" + + " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.0:supplier_id\")\n" + + " JOIN-SCANNER 3 ROW LIMIT", }}); testCases.add(new String[][] { { @@ -1124,6 +1188,40 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " DYNAMIC SERVER FILTER BY \"I.:item_id\" IN (\"O.item_id\")\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + " SERVER 3 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" + + " CLIENT MERGE SORT\n" + + " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " JOIN-SCANNER 3 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " SERVER OFFSET 2\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" + + " CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.0:supplier_id\")\n" + + " JOIN-SCANNER 3 ROW LIMIT", }}); return testCases; } @@ -2920,6 +3018,26 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { assertEquals(rs.getString(4), "S5"); assertFalse(rs.next()); + + query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + + JOIN_ITEM_TABLE_FULL_NAME + + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + + JOIN_SUPPLIER_TABLE_FULL_NAME + + " ORDER BY \"supplier_id\" OFFSET 2) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getString(1), "0000000005"); + assertEquals(rs.getString(2), "T5"); + assertEquals(rs.getString(3), "0000000005"); + assertEquals(rs.getString(4), "S5"); + assertTrue(rs.next()); + assertEquals(rs.getString(1), "0000000006"); + assertEquals(rs.getString(2), "T6"); + assertEquals(rs.getString(3), "0000000006"); + assertEquals(rs.getString(4), "S6"); + assertFalse(rs.next()); + } finally { conn.close(); } @@ -3234,6 +3352,49 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { } @Test + public void testJoinWithOffset() throws Exception { + String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + + " s LEFT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 "; + String query2 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + + " s JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN " + + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 "; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + PreparedStatement statement = conn.prepareStatement(query1); + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + assertEquals(rs.getString(2), "T3"); + assertEquals(rs.getString(3), "S2"); + assertEquals(rs.getString(4), "202 YYY Street"); + assertEquals(rs.getInt(5), 5000); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query1); + assertEquals(plans[22], QueryUtil.getExplainPlan(rs)); + + statement = conn.prepareStatement(query2); + rs = statement.executeQuery(); + + assertTrue(rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + assertEquals(rs.getString(2), "T3"); + assertEquals(rs.getString(3), "S2"); + assertEquals(rs.getString(4), "202 YYY Street"); + assertEquals(rs.getInt(5), 5000); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query2); + assertEquals(plans[23], QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + + @Test public void testNonEquiJoin() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java new file mode 100644 index 0000000..c609581 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java @@ -0,0 +1,211 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT { + + private String tableName; + private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", + "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" }; + private final String ddl; + + public QueryWithOffsetIT(String preSplit) { + this.tableName = tableName + "_" + preSplit.charAt(2); + this.ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + preSplit; + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(true)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Parameters(name="preSplit = {0}") + public static Collection<String> data() { + return Arrays.asList(new String[] { " SPLIT ON ('e','i','o')", " SALT_BUCKETS=10" }); + } + + @Test + public void testLimitOffset() throws SQLException { + Connection conn; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + createTestTable(getUrl(), ddl); + initTableValues(conn); + int limit = 10; + int offset = 10; + updateStatistics(conn); + ResultSet rs; + rs = conn.createStatement() + .executeQuery("SELECT t_id from " + tableName + " order by t_id limit " + limit + " offset " + offset); + int i = 0; + while (i++ < limit) { + assertTrue(rs.next()); + assertEquals(strings[offset + i - 1], rs.getString(1)); + } + + limit = 35; + rs = conn.createStatement().executeQuery("SELECT t_id from " + tableName + " union all SELECT t_id from " + + tableName + " offset " + offset + " FETCH FIRST " + limit + " rows only"); + i = 0; + while (i++ < strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings[offset + i - 1], rs.getString(1)); + } + i = 0; + while (i++ < limit - strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings[i - 1], rs.getString(1)); + } + conn.close(); + } + + @Test + public void testOffsetSerialQueryExecutedOnServer() throws SQLException { + Connection conn; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + int offset = 10; + createTestTable(getUrl(), ddl); + initTableValues(conn); + updateStatistics(conn); + String query = "SELECT t_id from " + tableName + " offset " + offset; + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); + rs.next(); + rs.next(); + rs.next(); + assertEquals(" SERVER OFFSET " + offset, rs.getString(1)); + rs = conn.createStatement().executeQuery(query); + int i = 0; + while (i++ < strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings[offset + i - 1], rs.getString(1)); + } + conn.close(); + } + + @Test + public void testOffsetWithoutLimit() throws SQLException { + Connection conn; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + int offset = 10; + createTestTable(getUrl(), ddl); + initTableValues(conn); + updateStatistics(conn); + ResultSet rs; + rs = conn.createStatement() + .executeQuery("SELECT t_id from " + tableName + " order by t_id offset " + offset + " row"); + int i = 0; + while (i++ < strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings[offset + i - 1], rs.getString(1)); + } + + rs = conn.createStatement().executeQuery( + "SELECT k3, count(*) from " + tableName + " group by k3 order by k3 desc offset " + offset + " row"); + + i = 0; + while (i++ < strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings.length - offset - i + 2, rs.getInt(1)); + } + + rs = conn.createStatement().executeQuery("SELECT t_id from " + tableName + " union all SELECT t_id from " + + tableName + " offset " + offset + " rows"); + i = 0; + while (i++ < strings.length - offset) { + assertTrue(rs.next()); + assertEquals(strings[offset + i - 1], rs.getString(1)); + } + i = 0; + while (i++ < strings.length) { + assertTrue(rs.next()); + assertEquals(strings[i - 1], rs.getString(1)); + } + conn.close(); + } + + private void initTableValues(Connection conn) throws SQLException { + for (int i = 0; i < 26; i++) { + conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn.commit(); + } + + private void updateStatistics(Connection conn) throws SQLException { + String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + + "\"=" + Long.toString(500); + conn.createStatement().execute(query); + } + + @Test + public void testMetaDataWithOffset() throws SQLException { + Connection conn; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + createTestTable(getUrl(), ddl); + initTableValues(conn); + updateStatistics(conn); + PreparedStatement stmt = conn.prepareStatement("SELECT * from " + tableName + " offset ?"); + ParameterMetaData pmd = stmt.getParameterMetaData(); + assertEquals(1, pmd.getParameterCount()); + assertEquals(Types.INTEGER, pmd.getParameterType(1)); + stmt.setInt(1, 10); + ResultSet rs = stmt.executeQuery(); + ResultSetMetaData md = rs.getMetaData(); + assertEquals(5, md.getColumnCount()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java index fbebe08..bcc4ee8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java @@ -40,11 +40,11 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); String ddl = "CREATE TABLE test_table " + - " (row varchar not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (row))\n"; + " (r varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (r))\n"; createTestTable(getUrl(), ddl); - String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 777)"; + String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 777)"; PreparedStatement statement = conn.prepareStatement(query); statement.executeUpdate(); conn.commit(); @@ -53,8 +53,8 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT { conn.setReadOnly(true); assertTrue(conn.isReadOnly()); ddl = "CREATE TABLE test_table2 " + - " (row varchar not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (row))\n"; + " (r varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (r))\n"; statement = conn.prepareStatement(ddl); statement.executeUpdate(); conn.commit(); @@ -64,7 +64,7 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT { } try { - query = "UPSERT INTO test_table(row, col1) VALUES('row1', 888)"; + query = "UPSERT INTO test_table(r, col1) VALUES('row1', 888)"; statement = conn.prepareStatement(query); statement.executeUpdate(); conn.commit(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java index f19b886..43afd0d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java @@ -2383,6 +2383,50 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT { } @Test + public void testJoinWithOffset() throws Exception { + String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, s.address, quantity FROM " + + JOIN_SUPPLIER_TABLE_FULL_NAME + " s LEFT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + JOIN_ORDER_TABLE_FULL_NAME + + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 2 OFFSET 1"; + String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, s.address, quantity FROM " + + JOIN_SUPPLIER_TABLE_FULL_NAME + " s JOIN " + JOIN_ITEM_TABLE_FULL_NAME + + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN " + JOIN_ORDER_TABLE_FULL_NAME + + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + PreparedStatement statement = conn.prepareStatement(query1); + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + assertNull(rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals(rs.getString(3), "S4"); + assertEquals(rs.getString(4), "404 YYY Street"); + assertEquals(rs.getInt(5), 0); + assertTrue(rs.next()); + assertEquals(rs.getString(1), "000000000000001"); + assertEquals(rs.getString(2), "T1"); + assertEquals(rs.getString(3), "S1"); + assertEquals(rs.getString(4), "101 YYY Street"); + assertEquals(rs.getInt(5), 1000); + assertFalse(rs.next()); + + statement = conn.prepareStatement(query2); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + assertEquals(rs.getString(2), "T3"); + assertEquals(rs.getString(3), "S2"); + assertEquals(rs.getString(4), "202 YYY Street"); + assertEquals(rs.getInt(5), 5000); + + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test public void testNonEquiJoin() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 834a410..a042090 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -125,6 +125,11 @@ tokens LIST = 'list'; JARS='jars'; ROW_TIMESTAMP='row_timestamp'; + OFFSET ='offset'; + FETCH = 'fetch'; + ROW = 'row'; + ROWS = 'rows'; + ONLY = 'only'; } @@ -650,7 +655,7 @@ single_select returns [SelectStatement ret] (WHERE where=expression)? (GROUP BY group=group_by)? (HAVING having=expression)? - { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); } + { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null,null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); } ; finally{ contextStack.pop(); } @@ -665,7 +670,9 @@ select_node returns [SelectStatement ret] : u=unioned_selects (ORDER BY order=order_by)? (LIMIT l=limit)? - { ParseContext context = contextStack.peek(); $ret = factory.select(u, order, l, getBindCount(), context.isAggregate()); } + (OFFSET o=offset (ROW | ROWS)?)? + (FETCH (FIRST | NEXT) (l=limit)? (ROW | ROWS) ONLY)? + { ParseContext context = contextStack.peek(); $ret = factory.select(u, order, l, o, getBindCount(), context.isAggregate()); } ; finally{ contextStack.pop(); } @@ -697,6 +704,11 @@ limit returns [LimitNode ret] | l=int_or_long_literal { $ret = factory.limit(l); } ; +offset returns [OffsetNode ret] + : b=bind_expression { $ret = factory.offset(b); } + | l=int_or_long_literal { $ret = factory.offset(l); } + ; + sampling_rate returns [LiteralParseNode ret] : l=literal { $ret = l; } ; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 24a2add..2a97686 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -352,12 +352,10 @@ public class DeleteCompiler { PColumn column = table.getPKColumns().get(i); aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null))); } - select = FACTORY.select( - delete.getTable(), - hint, false, aliasedNodes, delete.getWhere(), - Collections.<ParseNode>emptyList(), null, - delete.getOrderBy(), delete.getLimit(), - delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes()); + select = FACTORY.select(delete.getTable(), hint, false, aliasedNodes, delete.getWhere(), + Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null, + delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), + delete.getUdfParseNodes()); select = StatementNormalizer.normalize(select, resolverToBe); SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); if (transformedSelect != select) { @@ -514,7 +512,8 @@ public class DeleteCompiler { projectorToBe = new RowProjector(projectorToBe,true); } final RowProjector projector = projectorToBe; - final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, null, + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); mutationPlans.add(new MutationPlan() { @Override public ParameterMetaData getParameterMetaData() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 5d03f57..e6c5970 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -689,7 +689,9 @@ public class JoinCompiler { if (isSubselect()) return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias()); - return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList(), select.getUdfParseNodes()); + return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, + null, orderBy, null, null, 0, false, select.hasSequence(), + Collections.<SelectStatement> emptyList(), select.getUdfParseNodes()); } public boolean hasFilters() { @@ -1071,7 +1073,8 @@ public class JoinCompiler { && !select.isAggregate() && !select.isDistinct() && !(select.getFrom() instanceof DerivedTableNode) - && select.getLimit() == null; + && select.getLimit() == null + && select.getOffset() == null; } private static ParseNode combine(List<ParseNode> nodes) { @@ -1268,7 +1271,8 @@ public class JoinCompiler { String tableAlias = tableRef.getTableAlias(); TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols); - return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList(), udfParseNodes); + return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, null, 0, + groupBy != null, hasSequence, Collections.<SelectStatement> emptyList(), udfParseNodes); } public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 7f3277a..1047334 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -194,6 +194,11 @@ public class ListJarsQueryPlan implements QueryPlan { } @Override + public Integer getOffset() { + return null; + } + + @Override public OrderBy getOrderBy() { return OrderBy.EMPTY_ORDER_BY; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java new file mode 100644 index 0000000..54be50b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.SQLException; + +import org.apache.phoenix.parse.BindParseNode; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.LiteralParseNode; +import org.apache.phoenix.parse.OffsetNode; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.TraverseNoParseNodeVisitor; +import org.apache.phoenix.schema.PDatum; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; + +public class OffsetCompiler { + private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory(); + + public static final PDatum OFFSET_DATUM = new PDatum() { + @Override + public boolean isNullable() { + return false; + } + + @Override + public PDataType getDataType() { + return PInteger.INSTANCE; + } + + @Override + public Integer getMaxLength() { + return null; + } + + @Override + public Integer getScale() { + return null; + } + + @Override + public SortOrder getSortOrder() { + return SortOrder.getDefault(); + } + }; + + private OffsetCompiler() {} + + public static Integer compile(StatementContext context, FilterableStatement statement) throws SQLException { + OffsetNode offsetNode = statement.getOffset(); + if (offsetNode == null) { return null; } + OffsetParseNodeVisitor visitor = new OffsetParseNodeVisitor(context); + offsetNode.getOffsetParseNode().accept(visitor); + return visitor.getOffset(); + } + + private static class OffsetParseNodeVisitor extends TraverseNoParseNodeVisitor<Void> { + private final StatementContext context; + private Integer offset; + + public OffsetParseNodeVisitor(StatementContext context) { + this.context = context; + } + + public Integer getOffset() { + return offset; + } + + @Override + public Void visit(LiteralParseNode node) throws SQLException { + Object offsetValue = node.getValue(); + if (offsetValue != null) { + Integer offset = (Integer)OFFSET_DATUM.getDataType().toObject(offsetValue, node.getType()); + if (offset.intValue() >= 0) { + this.offset = offset; + } + } + return null; + } + + @Override + public Void visit(BindParseNode node) throws SQLException { + // This is for static evaluation in SubselectRewriter. + if (context == null) return null; + + Object value = context.getBindManager().getBindValue(node); + context.getBindManager().addParamMetaData(node, OFFSET_DATUM); + // Resolve the bind value, create a LiteralParseNode, and call the + // visit method for it. + // In this way, we can deal with just having a literal on one side + // of the expression. + visit(NODE_FACTORY.literal(value, OFFSET_DATUM.getDataType())); + return null; + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java index 0ae31f0..91fa5c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java @@ -82,7 +82,8 @@ public class OrderByCompiler { */ public static OrderBy compile(StatementContext context, SelectStatement statement, - GroupBy groupBy, Integer limit, + GroupBy groupBy, Integer limit, + Integer offset, RowProjector rowProjector, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 2659b3f..752e1a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -254,7 +254,8 @@ public class PostDDLCompiler { } catch (AmbiguousColumnException e) { continue; } - QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null, + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); try { ResultIterator iterator = plan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 8a86d67..82e2e92 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -72,6 +72,7 @@ import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; import com.google.common.collect.Lists; @@ -168,7 +169,11 @@ public class QueryCompiler { SelectStatement subSelect = unionAllSelects.get(i); // Push down order-by and limit into sub-selects. if (!select.getOrderBy().isEmpty() || select.getLimit() != null) { - subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), select.getLimit()); + if (select.getOffset() == null) { + subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), select.getLimit(), null); + } else { + subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), null, null); + } } QueryPlan subPlan = compileSubquery(subSelect, true); TupleProjector projector = new TupleProjector(subPlan.getProjector()); @@ -182,8 +187,8 @@ public class QueryCompiler { StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager); QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false); - plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, - plans, context.getBindManager().getParameterMetaData()); + plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(), plan.getOffset(), + plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans, context.getBindManager().getParameterMetaData()); return plan; } @@ -324,10 +329,13 @@ public class QueryCompiler { QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true); Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table); Integer limit = null; + Integer offset = null; if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) { limit = plan.getLimit(); + offset = plan.getOffset(); } - HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit); + HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, + starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans); } @@ -378,10 +386,14 @@ public class QueryCompiler { QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true); Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable); Integer limit = null; + Integer offset = null; if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) { limit = rhsPlan.getLimit(); + offset = rhsPlan.getOffset(); } - HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit); + HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions }, + new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true }, + new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions); return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())}); @@ -432,7 +444,11 @@ public class QueryCompiler { context.setResolver(resolver); TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString())); ParseNode where = joinTable.getPostFiltersCombined(); - SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes()) + SelectStatement select = asSubquery + ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, + Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false, + joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(), + joinTable.getStatement().getUdfParseNodes()) : NODE_FACTORY.select(joinTable.getStatement(), from, where); return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder); @@ -529,6 +545,7 @@ public class QueryCompiler { viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere(); } Integer limit = LimitCompiler.compile(context, select); + Integer offset = OffsetCompiler.compile(context, select); GroupBy groupBy = GroupByCompiler.compile(context, select, isInRowKeyOrder); // Optimize the HAVING clause by finding any group by expressions that can be moved @@ -547,7 +564,8 @@ public class QueryCompiler { groupBy = groupBy.compile(context, innerPlanTupleProjector); context.setResolver(resolver); // recover resolver RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where); - OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder); + OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector, + groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder); // Final step is to build the query plan if (!asSubquery) { int maxRows = statement.getMaxRows(); @@ -567,11 +585,14 @@ public class QueryCompiler { QueryPlan plan = innerPlan; if (plan == null) { ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory; - plan = select.getFrom() == null ? - new LiteralResultIterationPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory) - : (select.isAggregate() || select.isDistinct() ? - new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having) - : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter)); + plan = select.getFrom() == null + ? new LiteralResultIterationPlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory) + : (select.isAggregate() || select.isDistinct() + ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory, groupBy, having) + : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory, allowPageFilter)); } if (!subqueries.isEmpty()) { int count = subqueries.size(); @@ -588,9 +609,10 @@ public class QueryCompiler { if (LiteralExpression.isTrue(where)) { where = null; // we do not pass "true" as filter } - plan = select.isAggregate() || select.isDistinct() ? - new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan) - : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan); + plan = select.isAggregate() || select.isDistinct() + ? new ClientAggregatePlan(context, select, tableRef, projector, limit, offset, where, orderBy, + groupBy, having, plan) + : new ClientScanPlan(context, select, tableRef, projector, limit, offset, where, orderBy, plan); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index 1c0c469..4dcc134 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -60,6 +60,8 @@ public interface QueryPlan extends StatementPlan { Integer getLimit(); + Integer getOffset(); + OrderBy getOrderBy(); GroupBy getGroupBy(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java index 9b54c86..566afc8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java @@ -100,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter { if (selectNodes != normSelectNodes) { statement = NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), normSelectNodes, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), - statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); + statement.getLimit(), statement.getOffset(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java index 123cb6a..f051aa5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java @@ -341,7 +341,10 @@ public class SubqueryRewriter extends ParseNodeRewriter { groupbyNodes.set(i - 1, aliasedNode.getNode()); } SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true); - subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList(), subquery.getUdfParseNodes()); + subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), + subquery.getHint(), false, selectNodes, null, groupbyNodes, null, + Collections.<OrderByNode> emptyList(), null, null, subquery.getBindCount(), true, false, + Collections.<SelectStatement> emptyList(), subquery.getUdfParseNodes()); } ParseNode onNode = conditionExtractor.getJoinCondition(); @@ -364,7 +367,10 @@ public class SubqueryRewriter extends ParseNodeRewriter { return select; // Wrap as a derived table. - return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), select.getUdfParseNodes()); + return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), + HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, null, + select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), + select.getUdfParseNodes()); } private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java index 853d772..1def3a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java @@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.DerivedTableNode; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.LimitNode; +import org.apache.phoenix.parse.OffsetNode; import org.apache.phoenix.parse.OrderByNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeRewriter; @@ -110,6 +111,7 @@ public class SubselectRewriter extends ParseNodeRewriter { ParseNode havingRewrite = subselect.getHaving(); List<OrderByNode> orderByRewrite = subselect.getOrderBy(); LimitNode limitRewrite = subselect.getLimit(); + OffsetNode offsetRewrite = subselect.getOffset(); HintNode hintRewrite = subselect.getHint(); boolean isDistinctRewrite = subselect.isDistinct(); boolean isAggregateRewrite = subselect.isAggregate(); @@ -187,6 +189,13 @@ public class SubselectRewriter extends ParseNodeRewriter { } } + OffsetNode offset = select.getOffset(); + if (offsetRewrite != null || (limitRewrite != null && offset != null)) { + return select; + } else { + offsetRewrite = offset; + } + LimitNode limit = select.getLimit(); if (limit != null) { if (limitRewrite == null) { @@ -207,8 +216,10 @@ public class SubselectRewriter extends ParseNodeRewriter { hintRewrite = hintRewrite == null ? hint : HintNode.combine(hint, hintRewrite); } - return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, - havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); + return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, + whereRewrite, groupByRewrite, havingRewrite, orderByRewrite, limitRewrite, offsetRewrite, + select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), + select.getUdfParseNodes()); } private SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 58cdb64..699643b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -212,6 +212,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public Integer getOffset() { + return null; + } + + @Override public OrderBy getOrderBy() { return OrderBy.EMPTY_ORDER_BY; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 6ec7f70..7c6347f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -655,7 +655,7 @@ public class UpsertCompiler { scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); // Ignore order by - it has no impact - final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); return new MutationPlan() { @Override public ParameterMetaData getParameterMetaData() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 9b440ac..96ee543 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String TX_SCN = "_TxScn"; public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS"; + public final static String SCAN_OFFSET = "_RowOffset"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations http://git-wip-us.apache.org/repos/asf/phoenix/blob/f65d0481/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 5df5755..3333d9c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -23,19 +23,20 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Set; -import co.cask.tephra.Transaction; - import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -47,23 +48,27 @@ import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.iterate.OffsetResultIterator; import org.apache.phoenix.iterate.OrderedResultIterator; import org.apache.phoenix.iterate.RegionScannerResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import co.cask.tephra.Transaction; + /** * @@ -121,7 +126,8 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { orderByExpressions.add(orderByExpression); } ResultIterator inner = new RegionScannerResultIterator(s); - return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize); + return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, + estimatedRowSize); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -183,7 +189,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } - + byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET); + Integer scanOffset = null; + if (scanOffsetBytes != null) { + scanOffset = Bytes.toInt(scanOffsetBytes); + } RegionScanner innerScanner = s; Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet(); @@ -217,7 +227,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { if (j != null) { innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment()); } - + if (scanOffset != null) { + innerScanner = getOffsetScanner(c, innerScanner, + new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset), + scan.getAttribute(QueryConstants.LAST_SCAN) != null); + } final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); if (iterator == null) { return innerScanner; @@ -226,6 +240,73 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { return getTopNScanner(c, innerScanner, iterator, tenantId); } + private RegionScanner getOffsetScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, + final OffsetResultIterator iterator, final boolean isLastScan) throws IOException { + final Tuple firstTuple; + final HRegion region = c.getEnvironment().getRegion(); + region.startRegionOperation(); + try { + // Once we return from the first call to next, we've run through and + // cached + // the topN rows, so we no longer need to start/stop a region + // operation. + Tuple tuple = iterator.next(); + if (tuple == null && !isLastScan) { + List<KeyValue> kvList = new ArrayList<KeyValue>(1); + KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY, + QueryConstants.OFFSET_COLUMN, Bytes.toBytes(iterator.getUnusedOffset())); + kvList.add(kv); + Result r = new Result(kvList); + firstTuple = new ResultTuple(r); + } else { + firstTuple = tuple; + } + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return null; + } finally { + region.closeRegionOperation(); + } + return new BaseRegionScanner(s) { + private Tuple tuple = firstTuple; + + @Override + public boolean isFilterDone() { + return tuple == null; + } + + @Override + public boolean next(List<Cell> results) throws IOException { + try { + if (isFilterDone()) { return false; } + for (int i = 0; i < tuple.size(); i++) { + results.add(tuple.getValue(i)); + } + tuple = iterator.next(); + return !isFilterDone(); + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return false; + } + } + + @Override + public void close() throws IOException { + try { + s.close(); + } finally { + try { + if (iterator != null) { + iterator.close(); + } + } catch (SQLException e) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e); + } + } + } + }; + } + /** * Return region scanner that does TopN. * We only need to call startRegionOperation and closeRegionOperation when
