http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java index b573eab..f410dc8 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java @@ -84,10 +84,10 @@ class AvaticaJdbc41Factory implements AvaticaFactory { } public AvaticaResultSet newResultSet(AvaticaStatement statement, - Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) { + QueryState state, Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) { final ResultSetMetaData metaData = newResultSetMetaData(statement, signature); - return new AvaticaResultSet(statement, signature, metaData, timeZone, + return new AvaticaResultSet(statement, state, signature, metaData, timeZone, firstFrame); }
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java index 9144ed3..4f1c726 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.Meta.Signature; import org.apache.calcite.avatica.remote.TypedValue; import java.io.InputStream; @@ -106,7 +107,9 @@ public abstract class AvaticaPreparedStatement public ResultSet executeQuery() throws SQLException { this.updateCount = -1; - return getConnection().executeQueryInternal(this, getSignature(), null); + final Signature sig = getSignature(); + return getConnection().executeQueryInternal(this, sig, null, + new QueryState(sig.sql)); } public ParameterMetaData getParameterMetaData() throws SQLException { @@ -118,7 +121,8 @@ public abstract class AvaticaPreparedStatement } public long executeLargeUpdate() throws SQLException { - getConnection().executeQueryInternal(this, getSignature(), null); + getConnection().executeQueryInternal(this, getSignature(), null, + new QueryState(getSignature().sql)); return updateCount; } @@ -199,7 +203,8 @@ public abstract class AvaticaPreparedStatement public boolean execute() throws SQLException { this.updateCount = -1; - getConnection().executeQueryInternal(this, getSignature(), null); + getConnection().executeQueryInternal(this, getSignature(), null, + new QueryState(getSignature().sql)); // Result set is null for DML or DDL. // Result set is closed if user cancelled the query. return openResultSet != null && !openResultSet.isClosed(); http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java index f7b7ccd..478723f 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java @@ -49,6 +49,7 @@ import java.util.TimeZone; */ public class AvaticaResultSet implements ResultSet, ArrayImpl.Factory { protected final AvaticaStatement statement; + protected final QueryState state; protected final Meta.Signature signature; protected final Meta.Frame firstFrame; protected final List<ColumnMetaData> columnMetaDataList; @@ -69,11 +70,13 @@ public class AvaticaResultSet implements ResultSet, ArrayImpl.Factory { private Cursor timeoutCursor; public AvaticaResultSet(AvaticaStatement statement, + QueryState state, Meta.Signature signature, ResultSetMetaData resultSetMetaData, TimeZone timeZone, Meta.Frame firstFrame) { this.statement = statement; + this.state = state; this.signature = signature; this.firstFrame = firstFrame; this.columnMetaDataList = signature.columns; @@ -182,7 +185,7 @@ public class AvaticaResultSet implements ResultSet, ArrayImpl.Factory { protected AvaticaResultSet execute() throws SQLException { final List<TypedValue> parameterValues = statement.getBoundParameterValues(); final Iterable<Object> iterable1 = - statement.connection.meta.createIterable(statement.handle, signature, + statement.connection.meta.createIterable(statement.handle, state, signature, parameterValues, firstFrame); this.cursor = MetaImpl.createCursor(signature.cursorFactory, iterable1); this.accessorList = http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java index 6bcf8eb..ca73f43 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java @@ -104,7 +104,7 @@ public abstract class AvaticaStatement this.signature = signature; this.closed = false; if (h == null) { - final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(connection.id); + final Meta.ConnectionHandle ch = connection.handle; h = connection.meta.createStatement(ch); } connection.statementMap.put(h.id, this); @@ -130,12 +130,44 @@ public abstract class AvaticaStatement try { // In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0 final long maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount; - Meta.ExecuteResult x = - connection.prepareAndExecuteInternal(this, sql, maxRowCount1); + for (int i = 0; i < connection.maxRetriesPerExecute; i++) { + try { + Meta.ExecuteResult x = + connection.prepareAndExecuteInternal(this, sql, maxRowCount1); + return; + } catch (NoSuchStatementException e) { + resetStatement(); + } + } } catch (RuntimeException e) { throw connection.helper.createException("Error while executing SQL \"" + sql + "\": " + e.getMessage(), e); } + + throw new RuntimeException("Failed to successfully execute query after " + + connection.maxRetriesPerExecute + " attempts."); + } + + protected void resetStatement() { + // Invalidate the old statement + connection.statementMap.remove(handle.id); + // Get a new one + final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(connection.id); + Meta.StatementHandle h = connection.meta.createStatement(ch); + // Cache it in the connection + connection.statementMap.put(h.id, this); + // Update the local state and try again + this.handle = h; + } + + /** + * Re-initialize the ResultSet on the server with the given state. + * @param state The ResultSet's state. + * @param offset Offset into the desired ResultSet + * @return True if the ResultSet has more results, false if there are no more results. + */ + protected boolean syncResults(QueryState state, long offset) throws NoSuchStatementException { + return connection.meta.syncResults(handle, state, offset); } // implement Statement @@ -447,7 +479,7 @@ public abstract class AvaticaStatement */ protected ResultSet executeQueryInternal(Meta.Signature signature) throws SQLException { - return connection.executeQueryInternal(this, signature, null); + return connection.executeQueryInternal(this, signature, null, null); } /** http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/Meta.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java index 6c51b1c..5ddacdb 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java @@ -34,6 +34,7 @@ import java.lang.reflect.Method; import java.math.BigDecimal; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -204,7 +205,7 @@ public interface Meta { * <p>The default implementation just returns {@code iterable}, which it * requires to be not null; derived classes may instead choose to execute the * relational expression in {@code signature}. */ - Iterable<Object> createIterable(StatementHandle handle, Signature signature, + Iterable<Object> createIterable(StatementHandle stmt, QueryState state, Signature signature, List<TypedValue> parameterValues, Frame firstFrame); /** Prepares a statement. @@ -227,7 +228,7 @@ public interface Meta { * first frame of data */ ExecuteResult prepareAndExecute(StatementHandle h, String sql, - long maxRowCount, PrepareCallback callback); + long maxRowCount, PrepareCallback callback) throws NoSuchStatementException; /** Returns a frame of rows. * @@ -243,7 +244,8 @@ public interface Meta { * no limit * @return Frame, or null if there are no more */ - Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount); + Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws + NoSuchStatementException, MissingResultsException; /** Executes a prepared statement. * @@ -254,7 +256,7 @@ public interface Meta { * @return Frame, or null if there are no more */ ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, - long maxRowCount); + long maxRowCount) throws NoSuchStatementException; /** Called during the creation of a statement to allocate a new handle. * @@ -280,6 +282,13 @@ public interface Meta { /** Closes a connection */ void closeConnection(ConnectionHandle ch); + /** + * Re-set the {@link ResultSet} on a Statement. Not a JDBC method. + * @return True if there are results to fetch after resetting to the given offset. False otherwise + */ + boolean syncResults(StatementHandle sh, QueryState state, long offset) + throws NoSuchStatementException; + /** Sync client and server view of connection properties. * * <p>Note: this interface is considered "experimental" and may undergo further changes as this http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java index b34b325..f4764a1 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java @@ -751,15 +751,23 @@ public abstract class MetaImpl implements Meta { return createEmptyResultSet(MetaPseudoColumn.class); } - public Iterable<Object> createIterable(StatementHandle handle, + @Override public Iterable<Object> createIterable(StatementHandle handle, QueryState state, Signature signature, List<TypedValue> parameterValues, Frame firstFrame) { if (firstFrame != null && firstFrame.done) { return firstFrame.rows; } - return new FetchIterable(handle, firstFrame, parameterValues); + AvaticaStatement stmt; + try { + stmt = connection.lookupStatement(handle); + } catch (SQLException e) { + throw new RuntimeException(e); + } + return new FetchIterable(stmt, state, + firstFrame, parameterValues); } - public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) { + public Frame fetch(AvaticaStatement stmt, List<TypedValue> parameterValues, + long offset, int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException { return null; } @@ -824,33 +832,40 @@ public abstract class MetaImpl implements Meta { /** Iterable that yields an iterator over rows coming from a sequence of * {@link Frame}s. */ private class FetchIterable implements Iterable<Object> { - private final StatementHandle handle; + private final AvaticaStatement stmt; + private final QueryState state; private final Frame firstFrame; private final List<TypedValue> parameterValues; - public FetchIterable(StatementHandle handle, Frame firstFrame, + public FetchIterable(AvaticaStatement stmt, QueryState state, Frame firstFrame, List<TypedValue> parameterValues) { - this.handle = handle; + this.stmt = stmt; + this.state = state; this.firstFrame = firstFrame; this.parameterValues = parameterValues; } public Iterator<Object> iterator() { - return new FetchIterator(handle, firstFrame, parameterValues); + return new FetchIterator(stmt, state, firstFrame, parameterValues); } } /** Iterator over rows coming from a sequence of {@link Frame}s. */ private class FetchIterator implements Iterator<Object> { - private final StatementHandle handle; + private final AvaticaStatement stmt; + private final QueryState state; private Frame frame; private Iterator<Object> rows; private List<TypedValue> parameterValues; + private List<TypedValue> originalParameterValues; + private long currentOffset = 0; - public FetchIterator(StatementHandle handle, Frame firstFrame, + public FetchIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame, List<TypedValue> parameterValues) { - this.handle = handle; + this.stmt = stmt; + this.state = state; this.parameterValues = parameterValues; + this.originalParameterValues = parameterValues; if (firstFrame == null) { frame = Frame.MORE; rows = EmptyIterator.INSTANCE; @@ -874,6 +889,7 @@ public abstract class MetaImpl implements Meta { throw new NoSuchElementException(); } final Object o = rows.next(); + currentOffset++; moveNext(); return o; } @@ -887,7 +903,31 @@ public abstract class MetaImpl implements Meta { rows = null; break; } - frame = fetch(handle, frame.offset, AvaticaStatement.DEFAULT_FETCH_SIZE); + try { + // currentOffset updated after element is read from `rows` iterator + frame = fetch(stmt.handle, currentOffset, AvaticaStatement.DEFAULT_FETCH_SIZE); + } catch (NoSuchStatementException e) { + resetStatement(); + // re-fetch the batch where we left off + continue; + } catch (MissingResultsException e) { + try { + // We saw the statement, but it didnt' have a resultset initialized. So, reset it. + if (!stmt.syncResults(state, currentOffset)) { + // This returned false, so there aren't actually any more results to iterate over + frame = null; + rows = null; + break; + } + // syncResults returning true means we need to fetch those results + } catch (NoSuchStatementException e1) { + // Tried to reset the result set, but lost the statement, save a loop before retrying. + resetStatement(); + // Will just loop back around to a MissingResultsException, but w/e recursion + } + // Kick back to the top to try to fetch again (in both branches) + continue; + } parameterValues = null; // don't execute next time if (frame == null) { rows = null; @@ -898,6 +938,13 @@ public abstract class MetaImpl implements Meta { rows = frame.rows.iterator(); } } + + private void resetStatement() { + // If we have to reset the statement, we need to reset the parameterValues too + parameterValues = originalParameterValues; + // Defer to the statement to reset itself + stmt.resetStatement(); + } } /** Returns whether a list of parameter values has any null elements. */ http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java b/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java new file mode 100644 index 0000000..7746769 --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.apache.calcite.avatica.Meta.StatementHandle; + +import java.sql.ResultSet; + +/** + * An Exception which denotes that a cached Statement is present but has no {@link ResultSet}. + */ +public class MissingResultsException extends Exception { + + private static final long serialVersionUID = 1L; + + private final StatementHandle handle; + + public MissingResultsException(StatementHandle handle) { + this.handle = handle; + } + + public StatementHandle getHandle() { + return handle; + } +} + +// End MissingResultsException.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java new file mode 100644 index 0000000..b5a940d --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +/** + * An Exception that denotes that the given Connection is not cached. + */ +public class NoSuchConnectionException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final String connectionId; + + public NoSuchConnectionException(String connectionId) { + this.connectionId = connectionId; + } + + public String getConnectionId() { + return connectionId; + } +} + +// End NoSuchConnectionException.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java new file mode 100644 index 0000000..321011b --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.apache.calcite.avatica.Meta.StatementHandle; + +/** + * An Exception that denotes that the given Statement is not cached. + */ +public class NoSuchStatementException extends Exception { + + private static final long serialVersionUID = 1L; + + private final StatementHandle stmtHandle; + + public NoSuchStatementException(StatementHandle stmtHandle) { + this.stmtHandle = stmtHandle; + } + + public StatementHandle getStatementHandle() { + return stmtHandle; + } +} + +// End NoSuchStatementException.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java b/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java new file mode 100644 index 0000000..c702f3c --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.apache.calcite.avatica.proto.Common; +import org.apache.calcite.avatica.proto.Common.MetaDataOperationArgument; +import org.apache.calcite.avatica.proto.Common.MetaDataOperationArgument.ArgumentType; +import org.apache.calcite.avatica.remote.MetaDataOperation; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Objects; + +/** + * A struct used to encapsulate the necessary information to reconstitute a ResultSet in the + * Avatica server. + */ +public class QueryState { + + /** + * An enumeration that represents how a ResultSet was created. + */ + public static enum StateType { + SQL, + METADATA; + + public Common.StateType toProto() { + switch (this) { + case SQL: + return Common.StateType.SQL; + case METADATA: + return Common.StateType.METADATA; + default: + return Common.StateType.UNRECOGNIZED; + } + } + + public static StateType fromProto(Common.StateType protoType) { + switch (protoType) { + case SQL: + return StateType.SQL; + case METADATA: + return StateType.METADATA; + default: + throw new IllegalArgumentException("Unhandled StateType " + protoType); + } + } + } + + @JsonProperty("type") + public final StateType type; + + @JsonProperty("sql") + public final String sql; + + @JsonProperty("metaDataOperation") + public final MetaDataOperation metaDataOperation; + @JsonProperty("operationArgs") + public final Object[] operationArgs; + + /** + * Constructor encapsulating a SQL query used to create a result set. + * + * @param sql The SQL query. + */ + public QueryState(String sql) { + // This doesn't to be non-null + this.sql = sql; + this.type = StateType.SQL; + + // Null out the members we don't use. + this.metaDataOperation = null; + this.operationArgs = null; + } + + /** + * Constructor encapsulating a metadata operation's result set. + * + * @param op A pointer to the {@link DatabaseMetaData} operation being invoked. + * @param args The arguments to the method being invoked. + */ + public QueryState(MetaDataOperation op, Object... args) { + this.metaDataOperation = Objects.requireNonNull(op); + this.operationArgs = Arrays.copyOf(Objects.requireNonNull(args), args.length); + this.type = StateType.METADATA; + + // Null out the members we won't use + this.sql = null; + } + + /** + * Not intended for external use. For Jackson-databind only. + */ + public QueryState(StateType type, String sql, MetaDataOperation op, Object... args) { + this.type = Objects.requireNonNull(type); + switch (type) { + case SQL: + this.sql = Objects.requireNonNull(sql); + if (null != op) { + throw new IllegalArgumentException("Expected null MetaDataOperation, but got " + op); + } + this.metaDataOperation = null; + if (null != args) { + throw new IllegalArgumentException("Expected null arguments, but got " + + Arrays.toString(args)); + } + this.operationArgs = null; + break; + case METADATA: + this.metaDataOperation = Objects.requireNonNull(op); + this.operationArgs = Objects.requireNonNull(args); + if (null != sql) { + throw new IllegalArgumentException("Expected null SQl but got " + sql); + } + this.sql = null; + break; + default: + throw new IllegalArgumentException("Unable to handle StateType " + type); + } + } + + /** + * Not intended for external use. For Jackson-databind only. + */ + public QueryState() { + this.sql = null; + this.metaDataOperation = null; + this.type = null; + this.operationArgs = null; + } + + /** + * @return The {@link StateType} for this encapsulated state. + */ + public StateType getType() { + return type; + } + + /** + * @return The SQL expression to invoke. + */ + public String getSql() { + assert type == StateType.SQL; + return sql; + } + + /** + * @return The metadata operation to invoke. + */ + public MetaDataOperation getMetaDataOperation() { + assert type == StateType.METADATA; + return metaDataOperation; + } + + /** + * @return The Arguments for the given metadata operation. + */ + public Object[] getOperationArgs() { + assert type == StateType.METADATA; + return operationArgs; + } + + public ResultSet invoke(Connection conn, Statement statement) throws SQLException { + switch (type) { + case SQL: + boolean ret = Objects.requireNonNull(statement).execute(sql); + ResultSet results = statement.getResultSet(); + + // Either execute(sql) returned true or the resultSet was null + assert ret || null == results; + + return results; + case METADATA: + DatabaseMetaData metadata = Objects.requireNonNull(conn).getMetaData(); + switch (metaDataOperation) { + case GET_ATTRIBUTES: + verifyOpArgs(4); + return metadata.getAttributes((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_BEST_ROW_IDENTIFIER: + verifyOpArgs(5); + return metadata.getBestRowIdentifier((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (int) operationArgs[3], + (boolean) operationArgs[4]); + case GET_CATALOGS: + verifyOpArgs(0); + return metadata.getCatalogs(); + case GET_COLUMNS: + verifyOpArgs(4); + return metadata.getColumns((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_COLUMN_PRIVILEGES: + verifyOpArgs(4); + return metadata.getColumnPrivileges((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_CROSS_REFERENCE: + verifyOpArgs(6); + return metadata.getCrossReference((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3], + (String) operationArgs[4], + (String) operationArgs[5]); + case GET_EXPORTED_KEYS: + verifyOpArgs(3); + return metadata.getExportedKeys((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_FUNCTIONS: + verifyOpArgs(3); + return metadata.getFunctions((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_FUNCTION_COLUMNS: + verifyOpArgs(4); + return metadata.getFunctionColumns((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_IMPORTED_KEYS: + verifyOpArgs(3); + return metadata.getImportedKeys((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_INDEX_INFO: + verifyOpArgs(5); + return metadata.getIndexInfo((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (boolean) operationArgs[3], + (boolean) operationArgs[4]); + case GET_PRIMARY_KEYS: + verifyOpArgs(3); + return metadata.getPrimaryKeys((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_PROCEDURES: + verifyOpArgs(3); + return metadata.getProcedures((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_PROCEDURE_COLUMNS: + verifyOpArgs(4); + return metadata.getProcedureColumns((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_PSEUDO_COLUMNS: + verifyOpArgs(4); + return metadata.getPseudoColumns((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String) operationArgs[3]); + case GET_SCHEMAS: + verifyOpArgs(0); + return metadata.getSchemas(); + case GET_SCHEMAS_WITH_ARGS: + verifyOpArgs(2); + return metadata.getSchemas((String) operationArgs[0], + (String) operationArgs[1]); + case GET_SUPER_TABLES: + verifyOpArgs(3); + return metadata.getSuperTables((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_SUPER_TYPES: + verifyOpArgs(3); + return metadata.getSuperTypes((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_TABLES: + verifyOpArgs(4); + return metadata.getTables((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (String[]) operationArgs[3]); + case GET_TABLE_PRIVILEGES: + verifyOpArgs(3); + return metadata.getTablePrivileges((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + case GET_TABLE_TYPES: + verifyOpArgs(0); + return metadata.getTableTypes(); + case GET_TYPE_INFO: + verifyOpArgs(0); + return metadata.getTypeInfo(); + case GET_UDTS: + verifyOpArgs(4); + return metadata.getUDTs((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2], + (int[]) operationArgs[3]); + case GET_VERSION_COLUMNS: + verifyOpArgs(3); + return metadata.getVersionColumns((String) operationArgs[0], + (String) operationArgs[1], + (String) operationArgs[2]); + default: + throw new IllegalArgumentException("Unhandled Metadata operation: " + metaDataOperation); + } + default: + throw new IllegalArgumentException("Unable to process QueryState of type " + type); + } + } + + private void verifyOpArgs(int expectedArgs) { + if (expectedArgs != operationArgs.length) { + throw new RuntimeException("Expected " + expectedArgs + " arguments, but got " + + Arrays.toString(operationArgs)); + } + } + + public Common.QueryState toProto() { + Common.QueryState.Builder builder = Common.QueryState.newBuilder(); + + // Required + switch (type) { + case SQL: + builder.setType(Common.StateType.SQL); + break; + case METADATA: + builder.setType(Common.StateType.METADATA); + break; + default: + throw new IllegalStateException("Unhandled type: " + type); + } + + // Optional SQL + if (null != sql) { + builder.setSql(sql).setHasSql(true); + } + + // Optional metaDataOperation + if (null != metaDataOperation) { + builder.setOp(metaDataOperation.toProto()).setHasOp(true); + } + + // Optional operationArgs + if (null != operationArgs) { + builder.setHasArgs(true); + for (Object arg : operationArgs) { + MetaDataOperationArgument.Builder argBuilder = MetaDataOperationArgument.newBuilder(); + + if (null == arg) { + builder.addArgs(argBuilder.setType(ArgumentType.NULL).build()); + } else if (arg instanceof String) { + builder.addArgs(argBuilder.setType(ArgumentType.STRING) + .setStringValue((String) arg).build()); + } else if (arg instanceof Integer) { + builder.addArgs(argBuilder.setType(ArgumentType.INT).setIntValue((int) arg).build()); + } else if (arg instanceof Boolean) { + builder.addArgs( + argBuilder.setType(ArgumentType.BOOL).setBoolValue((boolean) arg).build()); + } else if (arg instanceof String[]) { + argBuilder.setType(ArgumentType.REPEATED_STRING); + for (String strArg : (String[]) arg) { + argBuilder.addStringArrayValues(strArg); + } + builder.addArgs(argBuilder.build()); + } else if (arg instanceof int[]) { + argBuilder.setType(ArgumentType.REPEATED_INT); + for (int intArg : (int[]) arg) { + argBuilder.addIntArrayValues(intArg); + } + builder.addArgs(argBuilder.build()); + } else { + throw new RuntimeException("Unexpected operation argument: " + arg.getClass()); + } + } + } else { + builder.setHasArgs(false); + } + + return builder.build(); + } + + public static QueryState fromProto(Common.QueryState protoState) { + StateType type = StateType.fromProto(protoState.getType()); + String sql = protoState.getHasSql() ? protoState.getSql() : null; + MetaDataOperation op = protoState.getHasOp() + ? MetaDataOperation.fromProto(protoState.getOp()) : null; + Object[] opArgs = null; + if (protoState.getHasArgs()) { + opArgs = new Object[protoState.getArgsCount()]; + int i = 0; + for (Common.MetaDataOperationArgument arg : protoState.getArgsList()) { + switch (arg.getType()) { + case STRING: + opArgs[i] = arg.getStringValue(); + break; + case BOOL: + opArgs[i] = arg.getBoolValue(); + break; + case INT: + opArgs[i] = arg.getIntValue(); + break; + case REPEATED_STRING: + opArgs[i] = arg.getStringArrayValuesList().toArray( + new String[arg.getStringArrayValuesCount()]); + break; + case REPEATED_INT: + int[] arr = new int[arg.getIntArrayValuesCount()]; + int offset = 0; + for (Integer val : arg.getIntArrayValuesList()) { + arr[offset] = val; + offset++; + } + opArgs[i] = arr; + break; + case NULL: + opArgs[i] = null; + break; + default: + throw new RuntimeException("Could not interpret " + arg.getType()); + } + + i++; + } + } + + return new QueryState(type, sql, op, opArgs); + } + + @Override public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((metaDataOperation == null) ? 0 : metaDataOperation.hashCode()); + result = prime * result + Arrays.hashCode(operationArgs); + result = prime * result + ((sql == null) ? 0 : sql.hashCode()); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof QueryState)) { + return false; + } + + QueryState other = (QueryState) obj; + if (metaDataOperation != other.metaDataOperation) { + return false; + } + if (!Arrays.deepEquals(operationArgs, other.operationArgs)) { + return false; + } + if (sql == null) { + if (other.sql != null) { + return false; + } + } else if (!sql.equals(other.sql)) { + return false; + } + + return true; + } +} + +// End QueryState.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java index 96d2459..b1fe787 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java @@ -43,7 +43,7 @@ import java.util.logging.Logger; * <p>The provider must implement:</p> * <ul> * <li>{@link Meta#prepare(Meta.ConnectionHandle, String, long)} - * <li>{@link Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)} + * <li>{@link Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, org.apache.calcite.avatica.QueryState, org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)} * </ul> */ public abstract class UnregisteredDriver implements java.sql.Driver {
