PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53da5ceb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53da5ceb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53da5ceb Branch: refs/heads/4.x-HBase-1.1 Commit: 53da5cebe5081a3039f4c9fbfc8fdeb9d134552c Parents: ecd5d16 Author: Ankit Singhal <[email protected]> Authored: Wed May 17 14:09:18 2017 +0530 Committer: Ankit Singhal <[email protected]> Committed: Wed May 17 14:09:18 2017 +0530 ---------------------------------------------------------------------- .../CursorWithRowValueConstructorIT.java | 687 +++++++++++++++++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 31 + .../phoenix/compile/CloseStatementCompiler.java | 57 ++ .../phoenix/compile/DeclareCursorCompiler.java | 75 ++ .../phoenix/compile/OpenStatementCompiler.java | 57 ++ .../apache/phoenix/execute/CursorFetchPlan.java | 53 ++ .../phoenix/iterate/CursorResultIterator.java | 75 ++ .../apache/phoenix/jdbc/PhoenixStatement.java | 91 ++- .../apache/phoenix/parse/CloseStatement.java | 40 ++ .../org/apache/phoenix/parse/CursorName.java | 26 + .../phoenix/parse/DeclareCursorStatement.java | 60 ++ .../apache/phoenix/parse/FetchStatement.java | 52 ++ .../org/apache/phoenix/parse/OpenStatement.java | 40 ++ .../apache/phoenix/parse/ParseNodeFactory.java | 20 + .../org/apache/phoenix/parse/SQLParser.java | 76 ++ .../apache/phoenix/schema/MetaDataClient.java | 20 + .../org/apache/phoenix/util/CursorUtil.java | 189 +++++ .../java/org/apache/phoenix/util/ScanUtil.java | 6 +- .../phoenix/compile/CursorCompilerTest.java | 87 +++ .../apache/phoenix/parse/CursorParserTest.java | 367 ++++++++++ 20 files changed, 2106 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java new file mode 100644 index 0000000..dda4bd1 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.util.*; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.*; +import java.util.Properties; +import java.util.Random; + +import static org.apache.phoenix.util.TestUtil.*; +import static org.junit.Assert.*; + + +public class CursorWithRowValueConstructorIT extends ParallelStatsDisabledIT { + private static final String TABLE_NAME = "CursorRVCTestTable"; + protected static final Log LOG = LogFactory.getLog(CursorWithRowValueConstructorIT.class); + + public void createAndInitializeTestTable() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + + PreparedStatement stmt = conn.prepareStatement("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + "(a_id INTEGER NOT NULL, " + + "a_data INTEGER, " + + "CONSTRAINT my_pk PRIMARY KEY (a_id))"); + stmt.execute(); + synchronized (conn){ + conn.commit(); + } + + //Upsert test values into the test table + Random rand = new Random(); + stmt = conn.prepareStatement("UPSERT INTO " + TABLE_NAME + + "(a_id, a_data) VALUES (?,?)"); + int rowCount = 0; + while(rowCount < 100){ + stmt.setInt(1, rowCount); + stmt.setInt(2, rand.nextInt(501)); + stmt.execute(); + ++rowCount; + } + synchronized (conn){ + conn.commit(); + } + } + + public void deleteTestTable() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement("DROP TABLE IF EXISTS " + TABLE_NAME); + stmt.execute(); + synchronized (conn){ + conn.commit(); + } + } + + @Test + public void testCursorsOnTestTablePK() throws SQLException { + try{ + createAndInitializeTestTable(); + String querySQL = "SELECT a_id FROM " + TABLE_NAME; + + //Test actual cursor implementation + String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "OPEN testCursor"; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "FETCH NEXT FROM testCursor"; + ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + int rowID = 0; + while(rs.next()){ + assertEquals(rowID,rs.getInt(1)); + ++rowID; + rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(cursorSQL); + } + } finally{ + DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute(); + deleteTestTable(); + } + + } + + @Test + public void testCursorsOnRandomTableData() throws SQLException { + try{ + createAndInitializeTestTable(); + String querySQL = "SELECT a_id,a_data FROM " + TABLE_NAME + " ORDER BY a_data"; + String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "OPEN testCursor"; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "FETCH NEXT FROM testCursor"; + ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery(); + int rowCount = 0; + while(rs.next() && cursorRS.next()){ + assertEquals(rs.getInt(2),cursorRS.getInt(2)); + ++rowCount; + cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + } + assertEquals(100, rowCount); + } finally{ + DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute(); + deleteTestTable(); + } + } + + @Test + public void testCursorsOnTestTablePKDesc() throws SQLException { + try{ + createAndInitializeTestTable(); + String dummySQL = "SELECT a_id FROM " + TABLE_NAME + " ORDER BY a_id DESC"; + + String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "OPEN testCursor"; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "FETCH NEXT FROM testCursor"; + ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + int rowCount = 0; + while(rs.next()){ + assertEquals(99-rowCount, rs.getInt(1)); + rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + ++rowCount; + } + assertEquals(100, rowCount); + } finally{ + DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute(); + deleteTestTable(); + } + } + + @Test + public void testCursorsOnTestTableNonPKDesc() throws SQLException { + try{ + createAndInitializeTestTable(); + String dummySQL = "SELECT a_data FROM " + TABLE_NAME + " ORDER BY a_data DESC"; + + String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "OPEN testCursor"; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "FETCH NEXT FROM testCursor"; + ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + int rowCount = 0; + while(rs.next()){ + rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + ++rowCount; + } + assertEquals(100, rowCount); + } finally{ + DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute(); + deleteTestTable(); + } + } + + @Test + public void testCursorsOnWildcardSelect() throws SQLException { + try{ + createAndInitializeTestTable(); + String querySQL = "SELECT * FROM " + TABLE_NAME; + ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery(); + + String cursorSQL = "DECLARE testCursor CURSOR FOR "+querySQL; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "OPEN testCursor"; + DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute(); + cursorSQL = "FETCH NEXT FROM testCursor"; + ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + int rowCount = 0; + while(rs.next() && cursorRS.next()){ + assertEquals(rs.getInt(1),cursorRS.getInt(1)); + ++rowCount; + cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery(); + } + assertEquals(100, rowCount); + } finally{ + DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute(); + deleteTestTable(); + } + } + + @Test + public void testCursorsWithBindings() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND (a_integer, x_integer) = (7, 5)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + PreparedStatement statement = conn.prepareStatement(cursor); + statement.setString(1, tenantId); + statement.execute(); + }catch(SQLException e){ + assertTrue(e.getMessage().equalsIgnoreCase("Cannot declare cursor, internal SELECT statement contains bindings!")); + assertTrue(!CursorUtil.cursorDeclared("testCursor")); + return; + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + fail(); + } + + @Test + public void testCursorsInWhereWithEqualsExpression() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) = (7, 5)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue(rs.getInt(1) == 7); + assertTrue(rs.getInt(2) == 5); + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + assertTrue(count == 1); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + public void testCursorsInWhereWithGreaterThanExpression() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= (4, 4)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue(rs.getInt(1) >= 4); + assertTrue(rs.getInt(1) == 4 ? rs.getInt(2) >= 4 : rs.getInt(2) >= 0); + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + assertTrue(count == 5); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + public void testCursorsInWhereWithUnEqualNumberArgs() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer, y_integer) >= (7, 5)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + double startTime = System.nanoTime(); + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue(rs.getInt(1) >= 7); + assertTrue(rs.getInt(1) == 7 ? rs.getInt(2) >= 5 : rs.getInt(2) >= 0); + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records. + assertTrue(count == 3); + double endTime = System.nanoTime(); + System.out.println("Method Time in milliseconds: "+Double.toString((endTime-startTime)/1000000)); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + public void testCursorsOnLHSAndLiteralExpressionOnRHS() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= 7"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records. + assertTrue(count == 3); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + public void testCursorsOnRHSLiteralExpressionOnLHS() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND 7 <= (a_integer, x_integer)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records. + assertTrue(count == 3); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + public void testCursorsOnBuiltInFunctionOperatingOnIntegerLiteral() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= to_number('7')"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + try { + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records. + assertEquals(3, count); + } finally { + cursor = "CLOSE testCursor"; + conn.prepareStatement(cursor).execute(); + conn.close(); + } + } + + @Test + /** + * Test for the precision of Date datatype when used as part of a filter within the internal Select statement. + */ + public void testCursorsWithDateDatatypeFilter() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + long currentTime = System.currentTimeMillis(); + java.sql.Date date = new java.sql.Date(currentTime); + String strCurrentDate = date.toString(); + + //Sets date to <yesterday's date> 23:59:59.999 + while(date.toString().equals(strCurrentDate)){ + currentTime -= 1; + date = new Date(currentTime); + } + //Sets date to <today's date> 00:00:00.001 + date = new Date(currentTime+2); + java.sql.Date midnight = new Date(currentTime+1); + + + initEntityHistoryTableValues(tenantId, getDefaultSplits(tenantId), date, ts); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); + Connection conn = DriverManager.getConnection(getUrl(), props); + + + String query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME + + " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))"; + + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+PARENTID3+"'"); + query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')"); + query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'"); + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+PARENTID7+"'"); + query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')"); + query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'"); + String cursor = "DECLARE testCursor CURSOR FOR "+query; + + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + assertTrue(rs.next()); + assertEquals(PARENTID3, rs.getString(1)); + rs = conn.prepareStatement(cursor).executeQuery(); + assertTrue(rs.next()); + assertEquals(PARENTID7, rs.getString(1)); + assertFalse(rs.next()); + + //Test against the same table for the same records, but this time use the 'midnight' java.sql.Date instance. + //'midnight' is identical to 'date' to the tens of millisecond precision. + query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME + + " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))"; + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+PARENTID3+"'"); + query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')"); + query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'"); + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+PARENTID7+"'"); + query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')"); + query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'"); + cursor = "DECLARE testCursor2 CURSOR FOR "+query; + + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor2"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor2"; + + rs = conn.prepareStatement(cursor).executeQuery(); + assertTrue(!rs.next()); + String sql = "CLOSE testCursor"; + conn.prepareStatement(sql).execute(); + sql = "CLOSE testCursor2"; + conn.prepareStatement(sql).execute(); + } + + @Test + public void testCursorsWithNonLeadingPkColsOfTypesTimeStampAndVarchar() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String updateStmt = + "upsert into " + + "ATABLE(" + + " ORGANIZATION_ID, " + + " ENTITY_ID, " + + " A_TIMESTAMP) " + + "VALUES (?, ?, ?)"; + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection upsertConn = DriverManager.getConnection(url, props); + upsertConn.setAutoCommit(true); + PreparedStatement stmt = upsertConn.prepareStatement(updateStmt); + stmt.setString(1, tenantId); + stmt.setString(2, ROW4); + Timestamp tsValue = new Timestamp(System.nanoTime()); + stmt.setTimestamp(3, tsValue); + stmt.execute(); + + String query = "SELECT a_timestamp, a_string FROM aTable WHERE ?=organization_id AND (a_timestamp, a_string) = (?, 'a')"; + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_TIMESTAMP_FORMAT).format(tsValue)+"')"); + + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + String cursor = "DECLARE testCursor CURSOR FOR "+query; + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue(rs.getTimestamp(1).equals(tsValue)); + assertTrue(rs.getString(2).compareTo("a") == 0); + count++; + rs = conn.prepareStatement(cursor).executeQuery(); + } + assertTrue(count == 1); + } finally { + String sql = "CLOSE testCursor"; + conn.prepareStatement(sql).execute(); + conn.close(); + } + } + + @Test + public void testCursorsQueryMoreWithInListClausePossibleNullValues() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String updateStmt = + "upsert into " + + "ATABLE(ORGANIZATION_ID, ENTITY_ID, Y_INTEGER, X_INTEGER) VALUES (?, ?, ?, ?)"; + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection upsertConn = DriverManager.getConnection(url, props); + upsertConn.setAutoCommit(true); + PreparedStatement stmt = upsertConn.prepareStatement(updateStmt); + stmt.setString(1, tenantId); + stmt.setString(2, ROW4); + stmt.setInt(3, 4); + stmt.setInt(4, 5); + stmt.execute(); + + //we have a row present in aTable where x_integer = 5 and y_integer = NULL which gets translated to 0 when retriving from HBase. + String query = "SELECT x_integer, y_integer FROM aTable WHERE ? = organization_id AND (x_integer) IN ((5))"; + + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String cursor = "DECLARE testCursor CURSOR FOR "+query; + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + assertTrue(rs.next()); + assertEquals(5, rs.getInt(1)); + assertEquals(4, rs.getInt(2)); + rs = conn.prepareStatement(cursor).executeQuery(); + assertTrue(rs.next()); + assertEquals(5, rs.getInt(1)); + assertEquals(0, rs.getInt(2)); + } finally { + String sql = "CLOSE testCursor"; + conn.prepareStatement(sql).execute(); + conn.close(); + } + } + + @Test + public void testCursorsWithColsOfTypesDecimal() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + String query = "SELECT x_decimal FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)"; + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+ROW7+"'"); + query = query.replaceFirst("\\?", "'"+ROW8+"'"); + query = query.replaceFirst("\\?", "'"+ROW9+"'"); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + String cursor = "DECLARE testCursor CURSOR FOR "+query; + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue(BigDecimal.valueOf(0.1).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.9).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.3).equals(rs.getBigDecimal(1))); + count++; + if(count == 3) break; + rs = conn.prepareStatement(cursor).executeQuery(); + } + assertTrue(count == 3); + } finally { + String sql = "CLOSE testCursor"; + conn.prepareStatement(sql).execute(); + conn.close(); + } + } + + @Test + public void testCursorsWithColsOfTypesTinyintSmallintFloatDouble() throws Exception { + long ts = nextTimestamp(); + String tenantId = getOrganizationId(); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); + ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); + String query = "SELECT a_byte,a_short,a_float,a_double FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)"; + query = query.replaceFirst("\\?", "'"+tenantId+"'"); + query = query.replaceFirst("\\?", "'"+ROW1+"'"); + query = query.replaceFirst("\\?", "'"+ROW2+"'"); + query = query.replaceFirst("\\?", "'"+ROW3+"'"); + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2 + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + String cursor = "DECLARE testCursor CURSOR FOR "+query; + conn.prepareStatement(cursor).execute(); + cursor = "OPEN testCursor"; + conn.prepareStatement(cursor).execute(); + cursor = "FETCH NEXT FROM testCursor"; + + ResultSet rs = conn.prepareStatement(cursor).executeQuery(); + int count = 0; + while(rs.next()) { + assertTrue((byte)1 == (rs.getByte(1)) || (byte)2 == (rs.getByte(1)) || (byte)3 == (rs.getByte(1))); + assertTrue((short)128 == (rs.getShort(2)) || (short)129 == (rs.getShort(2)) || (short)130 == (rs.getShort(2))); + assertTrue(0.01f == (rs.getFloat(3)) || 0.02f == (rs.getFloat(3)) || 0.03f == (rs.getFloat(3))); + assertTrue(0.0001 == (rs.getDouble(4)) || 0.0002 == (rs.getDouble(4)) || 0.0003 == (rs.getDouble(4))); + count++; + if(count == 3) break; + rs = conn.prepareStatement(cursor).executeQuery(); + } + assertTrue(count == 3); + } finally { + String sql = "CLOSE testCursor"; + conn.prepareStatement(sql).execute(); + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 07a51ce..66d50e9 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -130,6 +130,10 @@ tokens USE='use'; OFFSET ='offset'; FETCH = 'fetch'; + DECLARE = 'declare'; + CURSOR = 'cursor'; + OPEN = 'open'; + CLOSE = 'close'; ROW = 'row'; ROWS = 'rows'; ONLY = 'only'; @@ -409,6 +413,10 @@ oneStatement returns [BindableStatement ret] | s=create_schema_node | s=create_view_node | s=create_index_node + | s=cursor_open_node + | s=cursor_close_node + | s=cursor_fetch_node + | s=declare_cursor_node | s=drop_table_node | s=drop_index_node | s=alter_index_node @@ -744,6 +752,25 @@ upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret] (COMMA d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } )* ; + +// Parse a full declare cursor expression structure. +declare_cursor_node returns [DeclareCursorStatement ret] + : DECLARE c=cursor_name CURSOR FOR s=select_node + {ret = factory.declareCursor(c, s); } + ; + +cursor_open_node returns [OpenStatement ret] + : OPEN c=cursor_name {ret = factory.open(c);} + ; + +cursor_close_node returns [CloseStatement ret] + : CLOSE c=cursor_name {ret = factory.close(c);} + ; + +cursor_fetch_node returns [FetchStatement ret] + : FETCH NEXT (a=NUMBER)? (ROW|ROWS)? FROM c=cursor_name {ret = factory.fetch(c,true, a == null ? 1 : Integer.parseInt( a.getText() )); } + ; + // Parse a full delete expression structure. delete_node returns [DeleteStatement ret] : DELETE (hint=hintClause)? FROM t=from_table_name @@ -1033,6 +1060,10 @@ index_name returns [NamedNode ret] : name=identifier {$ret = factory.indexName(name); } ; +cursor_name returns [CursorName ret] + : name=identifier {$ret = factory.cursorName(name);} + ; + // TODO: figure out how not repeat this two times table_name returns [TableName ret] : t=identifier {$ret = factory.table(null, t); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java new file mode 100644 index 0000000..cc53a9d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.parse.CloseStatement; +import org.apache.phoenix.parse.OpenStatement; +import org.apache.phoenix.schema.MetaDataClient; + +import java.sql.SQLException; +import java.util.Collections; + +public class CloseStatementCompiler { + private final PhoenixStatement statement; + private final Operation operation; + + public CloseStatementCompiler(PhoenixStatement statement, Operation operation) { + this.statement = statement; + this.operation = operation; + } + + public MutationPlan compile(final CloseStatement close) throws SQLException { + final PhoenixConnection connection = statement.getConnection(); + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connection); + + return new BaseMutationPlan(context, operation) { + @Override + public MutationState execute() throws SQLException { + return client.close(close); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("CLOSE CURSOR")); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java new file mode 100644 index 0000000..5280291 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.parse.CreateIndexStatement; +import org.apache.phoenix.parse.DeclareCursorStatement; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.util.CursorUtil; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable.IndexType; + +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +public class DeclareCursorCompiler { + private final PhoenixStatement statement; + private final Operation operation; + private QueryPlan queryPlan; + + public DeclareCursorCompiler(PhoenixStatement statement, Operation operation, QueryPlan queryPlan) { + this.statement = statement; + this.operation = operation; + this.queryPlan = queryPlan; + } + + public MutationPlan compile(final DeclareCursorStatement declare) throws SQLException { + if(declare.getBindCount() != 0){ + throw new SQLException("Cannot declare cursor, internal SELECT statement contains bindings!"); + } + + final PhoenixConnection connection = statement.getConnection(); + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connection); + + return new BaseMutationPlan(context, operation) { + @Override + public MutationState execute() throws SQLException { + return client.declareCursor(declare, queryPlan); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("DECLARE CURSOR")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java new file mode 100644 index 0000000..b6125fd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.parse.DeclareCursorStatement; +import org.apache.phoenix.parse.OpenStatement; +import org.apache.phoenix.schema.MetaDataClient; + +import java.sql.SQLException; +import java.util.Collections; + +public class OpenStatementCompiler { + private final PhoenixStatement statement; + private final Operation operation; + + public OpenStatementCompiler(PhoenixStatement statement, Operation operation) { + this.statement = statement; + this.operation = operation; + } + + public MutationPlan compile(final OpenStatement open) throws SQLException { + final PhoenixConnection connection = statement.getConnection(); + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connection); + + return new BaseMutationPlan(context, operation) { + @Override + public MutationState execute() throws SQLException { + return client.open(open); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("OPEN CURSOR")); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java new file mode 100644 index 0000000..aaea13e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java @@ -0,0 +1,53 @@ +package org.apache.phoenix.execute; + +import java.sql.SQLException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.iterate.CursorResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.ParallelScanGrouper; +import org.apache.phoenix.iterate.ResultIterator; + +public class CursorFetchPlan extends DelegateQueryPlan { + + private CursorResultIterator resultIterator; + private int fetchSize; + private boolean isAggregate; + private String cursorName; + + public CursorFetchPlan(QueryPlan cursorQueryPlan,String cursorName) { + super(cursorQueryPlan); + this.isAggregate = delegate.getStatement().isAggregate() || delegate.getStatement().isDistinct(); + this.cursorName = cursorName; + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + StatementContext context = delegate.getContext(); + if (resultIterator == null) { + context.getOverallQueryMetrics().startQuery(); + resultIterator = new CursorResultIterator(LookAheadResultIterator.wrap(delegate.iterator(scanGrouper, scan)),cursorName); + } + return resultIterator; + } + + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return delegate.getExplainPlan(); + } + + public void setFetchSize(int fetchSize){ + this.fetchSize = fetchSize; + } + + public int getFetchSize() { + return fetchSize; + } + + public boolean isAggregate(){ + return this.isAggregate; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java new file mode 100644 index 0000000..7ff2785 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.CursorUtil; + +import java.sql.SQLException; +import java.util.List; + +public class CursorResultIterator implements ResultIterator { + private String cursorName; + private PeekingResultIterator delegate; + //TODO Configure fetch size from FETCH call + private int fetchSize = 0; + private int rowsRead = 0; + public CursorResultIterator(PeekingResultIterator delegate, String cursorName) { + this.delegate = delegate; + this.cursorName = cursorName; + } + + @Override + public Tuple next() throws SQLException { + if(!CursorUtil.moreValues(cursorName)){ + return null; + } else if (fetchSize == rowsRead) { + return null; + } + + Tuple next = delegate.next(); + CursorUtil.updateCursor(cursorName,next, delegate.peek()); + rowsRead++; + return next; + } + + @Override + public void explain(List<String> planSteps) { + delegate.explain(planSteps); + planSteps.add("CLIENT CURSOR " + cursorName); + } + + @Override + public String toString() { + return "CursorResultIterator [cursor=" + cursorName + "]"; + } + + @Override + public void close() throws SQLException { + //NOP + } + + public void closeCursor() throws SQLException { + delegate.close(); + } + + public void setFetchSize(int fetchSize){ + this.fetchSize = fetchSize; + this.rowsRead = 0; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index f3c6d30..6981a30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.BaseMutationPlan; +import org.apache.phoenix.compile.CloseStatementCompiler; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.CreateFunctionCompiler; @@ -55,6 +56,7 @@ import org.apache.phoenix.compile.CreateIndexCompiler; import org.apache.phoenix.compile.CreateSchemaCompiler; import org.apache.phoenix.compile.CreateSequenceCompiler; import org.apache.phoenix.compile.CreateTableCompiler; +import org.apache.phoenix.compile.DeclareCursorCompiler; import org.apache.phoenix.compile.DeleteCompiler; import org.apache.phoenix.compile.DropSequenceCompiler; import org.apache.phoenix.compile.ExplainPlan; @@ -63,6 +65,7 @@ import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.ListJarsQueryPlan; import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.compile.OpenStatementCompiler; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryCompiler; import org.apache.phoenix.compile.QueryPlan; @@ -91,13 +94,16 @@ import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.AlterIndexStatement; import org.apache.phoenix.parse.AlterSessionStatement; import org.apache.phoenix.parse.BindableStatement; +import org.apache.phoenix.parse.CloseStatement; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnName; +import org.apache.phoenix.parse.CursorName; import org.apache.phoenix.parse.CreateFunctionStatement; import org.apache.phoenix.parse.CreateIndexStatement; import org.apache.phoenix.parse.CreateSchemaStatement; import org.apache.phoenix.parse.CreateSequenceStatement; import org.apache.phoenix.parse.CreateTableStatement; +import org.apache.phoenix.parse.DeclareCursorStatement; import org.apache.phoenix.parse.DeleteJarStatement; import org.apache.phoenix.parse.DeleteStatement; import org.apache.phoenix.parse.DropColumnStatement; @@ -107,6 +113,7 @@ import org.apache.phoenix.parse.DropSchemaStatement; import org.apache.phoenix.parse.DropSequenceStatement; import org.apache.phoenix.parse.DropTableStatement; import org.apache.phoenix.parse.ExecuteUpgradeStatement; +import org.apache.phoenix.parse.FetchStatement; import org.apache.phoenix.parse.ExplainStatement; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; @@ -117,6 +124,7 @@ import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.NamedNode; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.OffsetNode; +import org.apache.phoenix.parse.OpenStatement; import org.apache.phoenix.parse.OrderByNode; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.ParseNode; @@ -155,6 +163,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.CursorUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixContextExecutor; @@ -294,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } StatementContext context = plan.getContext(); context.getOverallQueryMetrics().startQuery(); - PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context); + PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext()); resultSets.add(rs); setLastQueryPlan(plan); setLastResultSet(rs); @@ -403,6 +412,13 @@ public class PhoenixStatement implements Statement, SQLCloseable { super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects, udfParseNodes); } + private ExecutableSelectStatement(ExecutableSelectStatement select) { + this(select.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), select.getWhere(), + select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(), + select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); + } + + @SuppressWarnings("unchecked") @Override public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { @@ -417,6 +433,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection()); select = StatementNormalizer.normalize(transformedSelect, resolver); } + QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile(); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; @@ -765,6 +782,56 @@ public class PhoenixStatement implements Statement, SQLCloseable { } } + private static class ExecutableDeclareCursorStatement extends DeclareCursorStatement implements CompilableStatement { + public ExecutableDeclareCursorStatement(CursorName cursor, SelectStatement select){ + super(cursor, select); + } + + @Override + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + ExecutableSelectStatement wrappedSelect = new ExecutableSelectStatement( + (ExecutableSelectStatement) stmt.parseStatement(this.getQuerySQL())); + DeclareCursorCompiler compiler = new DeclareCursorCompiler(stmt, this.getOperation(),wrappedSelect.compilePlan(stmt, seqAction)); + return compiler.compile(this); + } + } + + private static class ExecutableOpenStatement extends OpenStatement implements CompilableStatement { + public ExecutableOpenStatement(CursorName cursor){ + super(cursor); + } + + @Override + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + OpenStatementCompiler compiler = new OpenStatementCompiler(stmt, this.getOperation()); + return compiler.compile(this); + } + } + + private static class ExecutableCloseStatement extends CloseStatement implements CompilableStatement { + public ExecutableCloseStatement(CursorName cursor){ + super(cursor); + } + + @Override + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + CloseStatementCompiler compiler = new CloseStatementCompiler(stmt, this.getOperation()); + return compiler.compile(this); + } + } + + private static class ExecutableFetchStatement extends FetchStatement implements CompilableStatement { + public ExecutableFetchStatement(CursorName cursor, boolean isNext, int fetchLimit){ + super(cursor, isNext, fetchLimit); + } + + @Override + public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + return CursorUtil.getFetchPlan(this.getCursorName().getName(), this.isNext(), this.getFetchSize()); + } + + } + private static class ExecutableDeleteJarStatement extends DeleteJarStatement implements CompilableStatement { public ExecutableDeleteJarStatement(LiteralParseNode jarPath) { @@ -1209,7 +1276,27 @@ public class PhoenixStatement implements Statement, SQLCloseable { Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) { return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs); } - + + @Override + public ExecutableDeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){ + return new ExecutableDeclareCursorStatement(cursor, select); + } + + @Override + public ExecutableFetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){ + return new ExecutableFetchStatement(cursor, isNext, fetchLimit); + } + + @Override + public ExecutableOpenStatement open(CursorName cursor){ + return new ExecutableOpenStatement(cursor); + } + + @Override + public ExecutableCloseStatement close(CursorName cursor){ + return new ExecutableCloseStatement(cursor); + } + @Override public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) { return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes); http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java new file mode 100644 index 0000000..5d7af34 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + +public class CloseStatement implements BindableStatement { + private final CursorName cursorName; + + public CloseStatement(CursorName cursorName){ + this.cursorName = cursorName; + } + + public String getCursorName(){ + return cursorName.getName(); + } + + public int getBindCount(){ + return 0; + } + + public Operation getOperation(){ + return Operation.UPSERT; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java new file mode 100644 index 0000000..5b9de76 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java @@ -0,0 +1,26 @@ +package org.apache.phoenix.parse; + +import org.apache.phoenix.util.SchemaUtil; + +public class CursorName { + private final String name; + private final boolean isCaseSensitive; + + public CursorName(String name, boolean isCaseSensitive){ + this.name = name; + this.isCaseSensitive = isCaseSensitive; + } + + public CursorName(String name){ + this.name = name; + this.isCaseSensitive = name == null ? false: SchemaUtil.isCaseSensitive(name); + } + + public String getName() { + return name; + } + + public boolean isCaseSensitive() { + return isCaseSensitive; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java new file mode 100644 index 0000000..68129ec --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import java.util.*; + +public class DeclareCursorStatement implements BindableStatement { + private final CursorName cursorName; + private final SelectStatement select; + + public DeclareCursorStatement(CursorName cursorName, SelectStatement select){ + this.cursorName = cursorName; + this.select = select; + } + + public String getCursorName(){ + return cursorName.getName(); + } + + public String getQuerySQL(){ + //Check if there are parameters to bind. + if(select.getBindCount() > 0){ + + } + //TODO: Test if this works + return select.toString(); + } + + public SelectStatement getSelect(){ + return select; + } + + public List<OrderByNode> getSelectOrderBy() { + return select.getOrderBy(); + } + + public int getBindCount(){ + return select.getBindCount(); + } + + public Operation getOperation(){ + return Operation.UPSERT; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java new file mode 100644 index 0000000..08e9724 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + +public class FetchStatement implements BindableStatement { + private final CursorName cursorName; + private final boolean isNext; + private final int fetchSize; + + public FetchStatement(CursorName cursorName, boolean isNext, int fetchSize){ + this.cursorName = cursorName; + this.isNext = isNext; + this.fetchSize = fetchSize; + } + + public CursorName getCursorName(){ + return cursorName; + } + + public boolean isNext(){ + return isNext; + } + + public int getBindCount(){ + return 0; + } + + public Operation getOperation(){ + return Operation.QUERY; + } + + public int getFetchSize(){ + return fetchSize; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java new file mode 100644 index 0000000..ad905b0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + +public class OpenStatement implements BindableStatement { + private final CursorName cursorName; + + public OpenStatement(CursorName cursorName){ + this.cursorName = cursorName; + } + + public String getCursorName(){ + return cursorName.getName(); + } + + public int getBindCount(){ + return 0; + } + + public Operation getOperation(){ + return Operation.UPSERT; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 4b65c6a..4628d51 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -731,6 +731,26 @@ public class ParseNodeFactory { return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs); } + public CursorName cursorName(String name){ + return new CursorName(name); + } + + public DeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){ + return new DeclareCursorStatement(cursor, select); + } + + public FetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){ + return new FetchStatement(cursor, isNext, fetchLimit); + } + + public OpenStatement open(CursorName cursor){ + return new OpenStatement(cursor); + } + + public CloseStatement close(CursorName cursor){ + return new CloseStatement(cursor); + } + public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) { return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java index 1a80991..b6b7de2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java @@ -139,6 +139,82 @@ public class SQLParser { } /** + * Parses the input as a SQL declare cursor statement. + * Used only in tests + * @throws SQLException + */ + public DeclareCursorStatement parseDeclareCursor() throws SQLException { + try { + DeclareCursorStatement statement = parser.declare_cursor_node(); + return statement; + } catch (RecognitionException e) { + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException) { + throw (SQLException) e.getCause(); + } + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } + } + + /** + * Parses the input as a SQL cursor open statement. + * Used only in tests + * @throws SQLException + */ + public OpenStatement parseOpen() throws SQLException { + try { + OpenStatement statement = parser.cursor_open_node(); + return statement; + } catch (RecognitionException e) { + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException) { + throw (SQLException) e.getCause(); + } + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } + } + + /** + * Parses the input as a SQL cursor close statement. + * Used only in tests + * @throws SQLException + */ + public CloseStatement parseClose() throws SQLException { + try { + CloseStatement statement = parser.cursor_close_node(); + return statement; + } catch (RecognitionException e) { + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException) { + throw (SQLException) e.getCause(); + } + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } + } + + /** + * Parses the input as a SQL cursor fetch statement. + * Used only in tests + * @throws SQLException + */ + public FetchStatement parseFetch() throws SQLException { + try { + FetchStatement statement = parser.cursor_fetch_node(); + return statement; + } catch (RecognitionException e) { + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException) { + throw (SQLException) e.getCause(); + } + throw PhoenixParserException.newException(e, parser.getTokenNames()); + } + } + + /** * Parses the input as a SQL select statement. * Used only in tests * @throws SQLException http://git-wip-us.apache.org/repos/asf/phoenix/blob/53da5ceb/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 8005e4a..1254d79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -144,6 +144,7 @@ import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.PostDDLCompiler; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.compile.PostLocalIndexDDLCompiler; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementNormalizer; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -165,6 +166,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AlterIndexStatement; +import org.apache.phoenix.parse.CloseStatement; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnDefInPkConstraint; import org.apache.phoenix.parse.ColumnName; @@ -173,6 +175,7 @@ import org.apache.phoenix.parse.CreateIndexStatement; import org.apache.phoenix.parse.CreateSchemaStatement; import org.apache.phoenix.parse.CreateSequenceStatement; import org.apache.phoenix.parse.CreateTableStatement; +import org.apache.phoenix.parse.DeclareCursorStatement; import org.apache.phoenix.parse.DropColumnStatement; import org.apache.phoenix.parse.DropFunctionStatement; import org.apache.phoenix.parse.DropIndexStatement; @@ -181,6 +184,7 @@ import org.apache.phoenix.parse.DropSequenceStatement; import org.apache.phoenix.parse.DropTableStatement; import org.apache.phoenix.parse.IndexKeyConstraint; import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.OpenStatement; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PFunction.FunctionArgument; import org.apache.phoenix.parse.PSchema; @@ -214,6 +218,7 @@ import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.CursorUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; @@ -1364,6 +1369,21 @@ public class MetaDataClient { return fullName; } + public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException { + CursorUtil.declareCursor(statement, queryPlan); + return new MutationState(0,connection); + } + + public MutationState open(OpenStatement statement) throws SQLException { + CursorUtil.openCursor(statement, connection); + return new MutationState(0,connection); + } + + public MutationState close(CloseStatement statement) throws SQLException { + CursorUtil.closeCursor(statement); + return new MutationState(0,connection); + } + /** * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling * MetaDataClient.createTable. In doing so, we perform the following translations:
