http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java new file mode 100644 index 0000000..12a5b59 --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.proto.Common; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; + +/** + * Identifies an operation from {@link DatabaseMetaData} which returns a {@link ResultSet}. This + * enum is used to allow clients to request the server to re-instantiate a {@link ResultSet} for + * these operations which do not have a SQL string associated with them as a normal query does. + */ +public enum MetaDataOperation { + GET_ATTRIBUTES, + GET_BEST_ROW_IDENTIFIER, + GET_CATALOGS, + GET_CLIENT_INFO_PROPERTIES, + GET_COLUMN_PRIVILEGES, + GET_COLUMNS, + GET_CROSS_REFERENCE, + GET_EXPORTED_KEYS, + GET_FUNCTION_COLUMNS, + GET_FUNCTIONS, + GET_IMPORTED_KEYS, + GET_INDEX_INFO, + GET_PRIMARY_KEYS, + GET_PROCEDURE_COLUMNS, + GET_PROCEDURES, + GET_PSEUDO_COLUMNS, + GET_SCHEMAS, + GET_SCHEMAS_WITH_ARGS, + GET_SUPER_TABLES, + GET_SUPER_TYPES, + GET_TABLE_PRIVILEGES, + GET_TABLES, + GET_TABLE_TYPES, + GET_TYPE_INFO, + GET_UDTS, + GET_VERSION_COLUMNS; + + public Common.MetaDataOperation toProto() { + switch (this) { + case GET_ATTRIBUTES: + return Common.MetaDataOperation.GET_ATTRIBUTES; + case GET_BEST_ROW_IDENTIFIER: + return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER; + case GET_CATALOGS: + return Common.MetaDataOperation.GET_CATALOGS; + case GET_CLIENT_INFO_PROPERTIES: + return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; + case GET_COLUMNS: + return Common.MetaDataOperation.GET_COLUMNS; + case GET_COLUMN_PRIVILEGES: + return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES; + case GET_CROSS_REFERENCE: + return Common.MetaDataOperation.GET_CROSS_REFERENCE; + case GET_EXPORTED_KEYS: + return Common.MetaDataOperation.GET_EXPORTED_KEYS; + case GET_FUNCTIONS: + return Common.MetaDataOperation.GET_FUNCTIONS; + case GET_FUNCTION_COLUMNS: + return Common.MetaDataOperation.GET_FUNCTION_COLUMNS; + case GET_IMPORTED_KEYS: + return Common.MetaDataOperation.GET_IMPORTED_KEYS; + case GET_INDEX_INFO: + return Common.MetaDataOperation.GET_INDEX_INFO; + case GET_PRIMARY_KEYS: + return Common.MetaDataOperation.GET_PRIMARY_KEYS; + case GET_PROCEDURES: + return Common.MetaDataOperation.GET_PROCEDURES; + case GET_PROCEDURE_COLUMNS: + return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS; + case GET_PSEUDO_COLUMNS: + return Common.MetaDataOperation.GET_PSEUDO_COLUMNS; + case GET_SCHEMAS: + return Common.MetaDataOperation.GET_SCHEMAS; + case GET_SCHEMAS_WITH_ARGS: + return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS; + case GET_SUPER_TABLES: + return Common.MetaDataOperation.GET_SUPER_TABLES; + case GET_SUPER_TYPES: + return Common.MetaDataOperation.GET_SUPER_TYPES; + case GET_TABLES: + return Common.MetaDataOperation.GET_TABLES; + case GET_TABLE_PRIVILEGES: + return Common.MetaDataOperation.GET_TABLE_PRIVILEGES; + case GET_TABLE_TYPES: + return Common.MetaDataOperation.GET_TABLE_TYPES; + case GET_TYPE_INFO: + return Common.MetaDataOperation.GET_TYPE_INFO; + case GET_UDTS: + return Common.MetaDataOperation.GET_UDTS; + case GET_VERSION_COLUMNS: + return Common.MetaDataOperation.GET_VERSION_COLUMNS; + default: + throw new RuntimeException("Unknown type: " + this); + } + } + + public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) { + // Null is acceptable + if (null == protoOp) { + return null; + } + + switch (protoOp) { + case GET_ATTRIBUTES: + return MetaDataOperation.GET_ATTRIBUTES; + case GET_BEST_ROW_IDENTIFIER: + return MetaDataOperation.GET_BEST_ROW_IDENTIFIER; + case GET_CATALOGS: + return MetaDataOperation.GET_CATALOGS; + case GET_CLIENT_INFO_PROPERTIES: + return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; + case GET_COLUMNS: + return MetaDataOperation.GET_COLUMNS; + case GET_COLUMN_PRIVILEGES: + return MetaDataOperation.GET_COLUMN_PRIVILEGES; + case GET_CROSS_REFERENCE: + return MetaDataOperation.GET_CROSS_REFERENCE; + case GET_EXPORTED_KEYS: + return MetaDataOperation.GET_EXPORTED_KEYS; + case GET_FUNCTIONS: + return MetaDataOperation.GET_FUNCTIONS; + case GET_FUNCTION_COLUMNS: + return MetaDataOperation.GET_FUNCTION_COLUMNS; + case GET_IMPORTED_KEYS: + return MetaDataOperation.GET_IMPORTED_KEYS; + case GET_INDEX_INFO: + return MetaDataOperation.GET_INDEX_INFO; + case GET_PRIMARY_KEYS: + return MetaDataOperation.GET_PRIMARY_KEYS; + case GET_PROCEDURES: + return MetaDataOperation.GET_PROCEDURES; + case GET_PROCEDURE_COLUMNS: + return MetaDataOperation.GET_PROCEDURE_COLUMNS; + case GET_PSEUDO_COLUMNS: + return MetaDataOperation.GET_PSEUDO_COLUMNS; + case GET_SCHEMAS: + return MetaDataOperation.GET_SCHEMAS; + case GET_SCHEMAS_WITH_ARGS: + return MetaDataOperation.GET_SCHEMAS_WITH_ARGS; + case GET_SUPER_TABLES: + return MetaDataOperation.GET_SUPER_TABLES; + case GET_SUPER_TYPES: + return MetaDataOperation.GET_SUPER_TYPES; + case GET_TABLES: + return MetaDataOperation.GET_TABLES; + case GET_TABLE_PRIVILEGES: + return MetaDataOperation.GET_TABLE_PRIVILEGES; + case GET_TABLE_TYPES: + return MetaDataOperation.GET_TABLE_TYPES; + case GET_TYPE_INFO: + return MetaDataOperation.GET_TYPE_INFO; + case GET_UDTS: + return MetaDataOperation.GET_UDTS; + case GET_VERSION_COLUMNS: + return MetaDataOperation.GET_VERSION_COLUMNS; + default: + throw new RuntimeException("Unknown type: " + protoOp); + } + } +} + +// End MetaDataOperation.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java index 2c2c7b2..11a6104 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java @@ -46,7 +46,7 @@ public class MockJsonService extends JsonService { /** Factory that creates a {@code MockJsonService}. */ public static class Factory implements Service.Factory { public Service create(AvaticaConnection connection) { - final String connectionId = connection.handle.id; + final String connectionId = connection.id; final Map<String, String> map1 = new HashMap<>(); try { map1.put( http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java index 3ca21f7..b04980b 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java @@ -136,7 +136,7 @@ public class MockProtobufService extends ProtobufService { */ public static class MockProtobufServiceFactory implements Service.Factory { @Override public Service create(AvaticaConnection connection) { - return new MockProtobufService(connection.handle.id); + return new MockProtobufService(connection.id); } } } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java index 8414708..8a79fc5 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java @@ -99,6 +99,10 @@ public abstract class ProtobufService extends AbstractService { return finagle((ExecuteResponse) _apply(request)); } + @Override public SyncResultsResponse apply(SyncResultsRequest request) { + return (SyncResultsResponse) _apply(request); + } + /** * Determines whether the given message has the field, denoted by the provided number, set. * http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java index 31d522d..9c68beb 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java @@ -30,6 +30,7 @@ import org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest; import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest; import org.apache.calcite.avatica.proto.Requests.PrepareRequest; import org.apache.calcite.avatica.proto.Requests.SchemasRequest; +import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest; import org.apache.calcite.avatica.proto.Requests.TableTypesRequest; import org.apache.calcite.avatica.proto.Requests.TablesRequest; import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest; @@ -44,6 +45,7 @@ import org.apache.calcite.avatica.proto.Responses.FetchResponse; import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse; import org.apache.calcite.avatica.proto.Responses.PrepareResponse; import org.apache.calcite.avatica.proto.Responses.ResultSetResponse; +import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse; import org.apache.calcite.avatica.remote.Service.Request; import org.apache.calcite.avatica.remote.Service.Response; @@ -106,6 +108,8 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { new RequestTranslator(TypeInfoRequest.parser(), new Service.TypeInfoRequest())); reqParsers.put(ExecuteRequest.class.getName(), new RequestTranslator(ExecuteRequest.parser(), new Service.ExecuteRequest())); + reqParsers.put(SyncResultsRequest.class.getName(), + new RequestTranslator(SyncResultsRequest.parser(), new Service.SyncResultsRequest())); REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers); @@ -138,6 +142,8 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { new ResponseTranslator(ResultSetResponse.parser(), new Service.ResultSetResponse())); respParsers.put(ErrorResponse.class.getName(), new ResponseTranslator(ErrorResponse.parser(), new Service.ErrorResponse())); + respParsers.put(SyncResultsResponse.class.getName(), + new ResponseTranslator(SyncResultsResponse.parser(), new Service.SyncResultsResponse())); RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers); } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java index e82b7ef..e741ffb 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java @@ -17,11 +17,15 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; import org.apache.calcite.avatica.AvaticaParameter; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ConnectionPropertiesImpl; import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.QueryState; import java.sql.SQLException; import java.util.ArrayList; @@ -75,150 +79,296 @@ class RemoteMeta extends MetaImpl { } } - @Override public StatementHandle createStatement(ConnectionHandle ch) { - connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection state if necessary - final Service.CreateStatementResponse response = - service.apply(new Service.CreateStatementRequest(ch.id)); - return new StatementHandle(response.connectionId, response.statementId, - null); + @Override public StatementHandle createStatement(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<StatementHandle>() { + public StatementHandle call() { + // sync connection state if necessary + connectionSync(ch, new ConnectionPropertiesImpl()); + final Service.CreateStatementResponse response = + service.apply(new Service.CreateStatementRequest(ch.id)); + return new StatementHandle(response.connectionId, response.statementId, null); + } + }); } - @Override public void closeStatement(StatementHandle h) { - final Service.CloseStatementResponse response = - service.apply(new Service.CloseStatementRequest(h.connectionId, h.id)); + @Override public void closeStatement(final StatementHandle h) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.CloseStatementResponse response = + service.apply( + new Service.CloseStatementRequest(h.connectionId, h.id)); + return null; + } + }); } - @Override public void openConnection(ConnectionHandle ch, Map<String, String> info) { - final Service.OpenConnectionResponse response = - service.apply(new Service.OpenConnectionRequest(ch.id, info)); + @Override public void openConnection(final ConnectionHandle ch, final Map<String, String> info) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.OpenConnectionResponse response = + service.apply(new Service.OpenConnectionRequest(ch.id, info)); + return null; + } + }); } - @Override public void closeConnection(ConnectionHandle ch) { - final Service.CloseConnectionResponse response = - service.apply(new Service.CloseConnectionRequest(ch.id)); - propsMap.remove(ch.id); + @Override public void closeConnection(final ConnectionHandle ch) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.CloseConnectionResponse response = + service.apply(new Service.CloseConnectionRequest(ch.id)); + propsMap.remove(ch.id); + return null; + } + }); } - @Override public ConnectionProperties connectionSync(ConnectionHandle ch, - ConnectionProperties connProps) { - ConnectionPropertiesImpl localProps = propsMap.get(ch.id); - if (localProps == null) { - localProps = new ConnectionPropertiesImpl(); - localProps.setDirty(true); - propsMap.put(ch.id, localProps); - } + @Override public ConnectionProperties connectionSync(final ConnectionHandle ch, + final ConnectionProperties connProps) { + return connection.invokeWithRetries( + new CallableWithoutException<ConnectionProperties>() { + public ConnectionProperties call() { + ConnectionPropertiesImpl localProps = propsMap.get(ch.id); + if (localProps == null) { + localProps = new ConnectionPropertiesImpl(); + localProps.setDirty(true); + propsMap.put(ch.id, localProps); + } - // Only make an RPC if necessary. RPC is necessary when we have local changes that need - // flushed to the server (be sure to introduce any new changes from connProps before checking - // AND when connProps.isEmpty() (meaning, this was a request for a value, not overriding a - // value). Otherwise, accumulate the change locally and return immediately. - if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) { - final Service.ConnectionSyncResponse response = service.apply( - new Service.ConnectionSyncRequest(ch.id, localProps)); - propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps); - return response.connProps; - } else { - return localProps; - } + // Only make an RPC if necessary. RPC is necessary when we have local changes that need + // flushed to the server (be sure to introduce any new changes from connProps before + // checking AND when connProps.isEmpty() (meaning, this was a request for a value, not + // overriding a value). Otherwise, accumulate the change locally and return immediately. + if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) { + final Service.ConnectionSyncResponse response = service.apply( + new Service.ConnectionSyncRequest(ch.id, localProps)); + propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps); + return response.connProps; + } else { + return localProps; + } + } + }); } - @Override public MetaResultSet getCatalogs(ConnectionHandle ch) { - final Service.ResultSetResponse response = - service.apply(new Service.CatalogsRequest(ch.id)); - return toResultSet(MetaCatalog.class, response); + @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.CatalogsRequest(ch.id)); + return toResultSet(MetaCatalog.class, response); + } + }); } - @Override public MetaResultSet getSchemas(ConnectionHandle ch, String catalog, - Pat schemaPattern) { - final Service.ResultSetResponse response = - service.apply(new Service.SchemasRequest(ch.id, catalog, schemaPattern.s)); - return toResultSet(MetaSchema.class, response); + @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.SchemasRequest(ch.id, catalog, schemaPattern.s)); + return toResultSet(MetaSchema.class, response); + } + }); } - @Override public MetaResultSet getTables(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern, List<String> typeList) { - final Service.ResultSetResponse response = - service.apply( - new Service.TablesRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, typeList)); - return toResultSet(MetaTable.class, response); + @Override public MetaResultSet getTables(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern, final Pat tableNamePattern, final List<String> typeList) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.TablesRequest(ch.id, catalog, schemaPattern.s, + tableNamePattern.s, typeList)); + return toResultSet(MetaTable.class, response); + } + }); } - @Override public MetaResultSet getTableTypes(ConnectionHandle ch) { - final Service.ResultSetResponse response = - service.apply(new Service.TableTypesRequest(ch.id)); - return toResultSet(MetaTableType.class, response); + @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.TableTypesRequest(ch.id)); + return toResultSet(MetaTableType.class, response); + } + }); } - @Override public MetaResultSet getTypeInfo(ConnectionHandle ch) { - final Service.ResultSetResponse response = - service.apply(new Service.TypeInfoRequest(ch.id)); - return toResultSet(MetaTypeInfo.class, response); + @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.TypeInfoRequest(ch.id)); + return toResultSet(MetaTypeInfo.class, response); + } + }); } - @Override public MetaResultSet getColumns(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern, Pat columnNamePattern) { - final Service.ResultSetResponse response = - service.apply( - new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, columnNamePattern.s)); - return toResultSet(MetaColumn.class, response); + @Override public MetaResultSet getColumns(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern, final Pat tableNamePattern, final Pat columnNamePattern) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s, + tableNamePattern.s, columnNamePattern.s)); + return toResultSet(MetaColumn.class, response); + } + }); } - @Override public StatementHandle prepare(ConnectionHandle ch, String sql, - long maxRowCount) { - connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection state if necessary - final Service.PrepareResponse response = service.apply( - new Service.PrepareRequest(ch.id, sql, maxRowCount)); - return response.statement; + @Override public StatementHandle prepare(final ConnectionHandle ch, final String sql, + final long maxRowCount) { + return connection.invokeWithRetries( + new CallableWithoutException<StatementHandle>() { + public StatementHandle call() { + connectionSync(ch, + new ConnectionPropertiesImpl()); // sync connection state if necessary + final Service.PrepareResponse response = service.apply( + new Service.PrepareRequest(ch.id, sql, maxRowCount)); + return response.statement; + } + }); } - @Override public ExecuteResult prepareAndExecute(StatementHandle h, - String sql, long maxRowCount, PrepareCallback callback) { - // sync connection state if necessary - connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl()); - final Service.ExecuteResponse response; + @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql, + final long maxRowCount, final PrepareCallback callback) throws NoSuchStatementException { try { - synchronized (callback.getMonitor()) { - callback.clear(); - response = service.apply( - new Service.PrepareAndExecuteRequest(h.connectionId, - h.id, sql, maxRowCount)); - if (response.results.size() > 0) { - final Service.ResultSetResponse result = response.results.get(0); - callback.assign(result.signature, result.firstFrame, - result.updateCount); - } + return connection.invokeWithRetries( + new CallableWithoutException<ExecuteResult>() { + public ExecuteResult call() { + // sync connection state if necessary + connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl()); + final Service.ExecuteResponse response; + try { + synchronized (callback.getMonitor()) { + callback.clear(); + response = service.apply( + new Service.PrepareAndExecuteRequest(h.connectionId, + h.id, sql, maxRowCount)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + if (response.results.size() > 0) { + final Service.ResultSetResponse result = response.results.get(0); + callback.assign(result.signature, result.firstFrame, + result.updateCount); + } + } + callback.execute(); + List<MetaResultSet> metaResultSets = new ArrayList<>(); + for (Service.ResultSetResponse result : response.results) { + metaResultSets.add(toResultSet(null, result)); + } + return new ExecuteResult(metaResultSets); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; } - callback.execute(); - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); - } - return new ExecuteResult(metaResultSets); - } catch (SQLException e) { - throw new RuntimeException(e); + throw e; } } - @Override public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) { - final Service.FetchResponse response = - service.apply( - new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount)); - return response.frame; + @Override public Frame fetch(final StatementHandle h, final long offset, + final int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<Frame>() { + public Frame call() { + final Service.FetchResponse response = + service.apply( + new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + if (response.missingResults) { + throw new RuntimeException(new MissingResultsException(h)); + } + return response.frame; + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } else if (cause instanceof MissingResultsException) { + throw (MissingResultsException) cause; + } + throw e; + } } - @Override public ExecuteResult execute(StatementHandle h, - List<TypedValue> parameterValues, long maxRowCount) { - final Service.ExecuteResponse response = service.apply( - new Service.ExecuteRequest(h, parameterValues, maxRowCount)); + @Override public ExecuteResult execute(final StatementHandle h, + final List<TypedValue> parameterValues, final long maxRowCount) + throws NoSuchStatementException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<ExecuteResult>() { + public ExecuteResult call() { + final Service.ExecuteResponse response = service.apply( + new Service.ExecuteRequest(h, parameterValues, maxRowCount)); + + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + + List<MetaResultSet> metaResultSets = new ArrayList<>(); + for (Service.ResultSetResponse result : response.results) { + metaResultSets.add(toResultSet(null, result)); + } - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); + return new ExecuteResult(metaResultSets); + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } + throw e; } + } - return new ExecuteResult(metaResultSets); + @Override public boolean syncResults(final StatementHandle h, final QueryState state, + final long offset) throws NoSuchStatementException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<Boolean>() { + public Boolean call() { + final Service.SyncResultsResponse response = + service.apply( + new Service.SyncResultsRequest(h.connectionId, h.id, state, offset)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + return response.moreResults; + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } + throw e; + } } } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java index fa447de..828513a 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java @@ -16,56 +16,26 @@ */ package org.apache.calcite.avatica.remote; -import org.apache.calcite.avatica.AvaticaUtils; - -import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; /** * ProtobufService implementation that queries against a remote implementation, using * protocol buffers as the serialized form. */ public class RemoteProtobufService extends ProtobufService { - private final URL url; + private final AvaticaHttpClient client; private final ProtobufTranslation translation; - public RemoteProtobufService(URL url, ProtobufTranslation translation) { - this.url = url; + public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation translation) { + this.client = client; this.translation = translation; } @Override public Response _apply(Request request) { - final InputStream inputStream; - - try { - final HttpURLConnection connection = - (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setDoInput(true); - connection.setDoOutput(true); - try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) { - // Convert the Request to a protobuf and send it over the wire - wr.write(translation.serializeRequest(request)); - wr.flush(); - wr.close(); - } - final int responseCode = connection.getResponseCode(); - if (responseCode != HttpURLConnection.HTTP_OK) { - inputStream = connection.getErrorStream(); - } else { - inputStream = connection.getInputStream(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - Response resp; + final Response resp; try { - // Read the (serialized protobuf) response off the wire and convert it back to a Response - resp = translation.parseResponse(AvaticaUtils.readFullyToBytes(inputStream)); + byte[] response = client.send(translation.serializeRequest(request)); + resp = translation.parseResponse(response); } catch (IOException e) { // Not a protobuf that we could parse. throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java index f661cbd..d4828b5 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java @@ -16,13 +16,7 @@ */ package org.apache.calcite.avatica.remote; -import org.apache.calcite.avatica.AvaticaUtils; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; +import java.nio.charset.StandardCharsets; /** * Implementation of {@link org.apache.calcite.avatica.remote.Service} @@ -30,42 +24,15 @@ import java.net.URL; * usually an HTTP server. */ public class RemoteService extends JsonService { - private final URL url; + private final AvaticaHttpClient client; - public RemoteService(URL url) { - this.url = url; + public RemoteService(AvaticaHttpClient client) { + this.client = client; } @Override public String apply(String request) { - try { - final HttpURLConnection connection = - (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setDoInput(true); - connection.setDoOutput(true); - if (request.length() < 256) { - connection.setRequestProperty("request", request); - } else { - try (DataOutputStream wr - = new DataOutputStream(connection.getOutputStream())) { - wr.writeBytes(request); - wr.flush(); - wr.close(); - } - } - final int responseCode = connection.getResponseCode(); - final InputStream inputStream; - if (responseCode != HttpURLConnection.HTTP_OK) { - inputStream = connection.getErrorStream(); - } else { - inputStream = connection.getInputStream(); - } - - return AvaticaUtils.readFully(inputStream); - } catch (IOException e) { - throw new RuntimeException(e); - } - + byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8)); + return new String(response, StandardCharsets.UTF_8); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java index 2309661..473e96c 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java @@ -19,8 +19,10 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaSeverity; +import org.apache.calcite.avatica.BuiltInConnectionProperty; import org.apache.calcite.avatica.ConnectionPropertiesImpl; import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.proto.Common; import org.apache.calcite.avatica.proto.Requests; import org.apache.calcite.avatica.proto.Responses; @@ -29,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; + import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Message; @@ -42,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Properties; /** * API for request-response calls to an Avatica server. @@ -56,6 +60,7 @@ public interface Service { PrepareResponse apply(PrepareRequest request); ExecuteResponse apply(ExecuteRequest request); ExecuteResponse apply(PrepareAndExecuteRequest request); + SyncResultsResponse apply(SyncResultsRequest request); FetchResponse apply(FetchRequest request); CreateStatementResponse apply(CreateStatementRequest request); CloseStatementResponse apply(CloseStatementRequest request); @@ -95,7 +100,8 @@ public interface Service { @JsonSubTypes.Type(value = CloseConnectionRequest.class, name = "closeConnection"), @JsonSubTypes.Type(value = ConnectionSyncRequest.class, name = "connectionSync"), - @JsonSubTypes.Type(value = DatabasePropertyRequest.class, name = "databaseProperties") }) + @JsonSubTypes.Type(value = DatabasePropertyRequest.class, name = "databaseProperties"), + @JsonSubTypes.Type(value = SyncResultsRequest.class, name = "syncResults") }) abstract class Request { abstract Response accept(Service service); abstract Request deserialize(Message genericMsg); @@ -121,7 +127,8 @@ public interface Service { @JsonSubTypes.Type(value = ConnectionSyncResponse.class, name = "connectionSync"), @JsonSubTypes.Type(value = DatabasePropertyResponse.class, name = "databaseProperties"), @JsonSubTypes.Type(value = ExecuteResponse.class, name = "executeResults"), - @JsonSubTypes.Type(value = ErrorResponse.class, name = "error") }) + @JsonSubTypes.Type(value = ErrorResponse.class, name = "error"), + @JsonSubTypes.Type(value = SyncResultsResponse.class, name = "syncResults") }) abstract class Response { abstract Response deserialize(Message genericMsg); abstract Message serialize(); @@ -1242,15 +1249,17 @@ public interface Service { * {@link org.apache.calcite.avatica.remote.Service.PrepareAndExecuteRequest}. */ class ExecuteResponse extends Response { public final List<ResultSetResponse> results; + public boolean missingStatement = false; ExecuteResponse() { results = null; } @JsonCreator - public ExecuteResponse( - @JsonProperty("resultSets") List<ResultSetResponse> results) { + public ExecuteResponse(@JsonProperty("resultSets") List<ResultSetResponse> results, + @JsonProperty("missingStatement") boolean missingStatement) { this.results = results; + this.missingStatement = missingStatement; } @Override ExecuteResponse deserialize(Message genericMsg) { @@ -1268,7 +1277,7 @@ public interface Service { copiedResults.add(ResultSetResponse.fromProto(msgResult)); } - return new ExecuteResponse(copiedResults); + return new ExecuteResponse(copiedResults, msg.getMissingStatement()); } @Override Responses.ExecuteResponse serialize() { @@ -1278,7 +1287,7 @@ public interface Service { builder.addResults(result.serialize()); } - return builder.build(); + return builder.setMissingStatement(missingStatement).build(); } @Override public int hashCode() { @@ -1581,14 +1590,20 @@ public interface Service { * {@link org.apache.calcite.avatica.remote.Service.FetchRequest}. */ class FetchResponse extends Response { public final Meta.Frame frame; + public boolean missingStatement = false; + public boolean missingResults = false; FetchResponse() { frame = null; } @JsonCreator - public FetchResponse(@JsonProperty("frame") Meta.Frame frame) { + public FetchResponse(@JsonProperty("frame") Meta.Frame frame, + @JsonProperty("missingStatement") boolean missingStatement, + @JsonProperty("missingResults") boolean missingResults) { this.frame = frame; + this.missingStatement = missingStatement; + this.missingResults = missingResults; } @Override FetchResponse deserialize(Message genericMsg) { @@ -1599,7 +1614,8 @@ public interface Service { Responses.FetchResponse msg = (Responses.FetchResponse) genericMsg; - return new FetchResponse(Meta.Frame.fromProto(msg.getFrame())); + return new FetchResponse(Meta.Frame.fromProto(msg.getFrame()), msg.getMissingStatement(), + msg.getMissingResults()); } @Override Responses.FetchResponse serialize() { @@ -1609,7 +1625,8 @@ public interface Service { builder.setFrame(frame.toProto()); } - return builder.build(); + return builder.setMissingStatement(missingStatement) + .setMissingResults(missingResults).build(); } @Override public int hashCode() { @@ -1634,7 +1651,7 @@ public interface Service { return false; } - return true; + return missingStatement == other.missingStatement; } return false; @@ -1932,6 +1949,31 @@ public interface Service { return service.apply(this); } + /** + * Serializes the necessary properties into a Map. + * + * @param props The properties to serialize. + * @return A representation of the Properties as a Map. + */ + public static Map<String, String> serializeProperties(Properties props) { + Map<String, String> infoAsString = new HashMap<>(); + for (Map.Entry<Object, Object> entry : props.entrySet()) { + // Determine if this is a property we want to forward to the server + boolean localProperty = false; + for (BuiltInConnectionProperty prop : BuiltInConnectionProperty.values()) { + if (prop.camelName().equals(entry.getKey())) { + localProperty = true; + break; + } + } + + if (!localProperty) { + infoAsString.put(entry.getKey().toString(), entry.getValue().toString()); + } + } + return infoAsString; + } + @Override Request deserialize(Message genericMsg) { if (!(genericMsg instanceof Requests.OpenConnectionRequest)) { throw new IllegalArgumentException( @@ -2452,6 +2494,8 @@ public interface Service { */ public class ErrorResponse extends Response { public static final int UNKNOWN_ERROR_CODE = -1; + public static final int MISSING_CONNECTION_ERROR_CODE = 1; + public static final String UNKNOWN_SQL_STATE = "00000"; public final List<String> exceptions; @@ -2590,6 +2634,169 @@ public interface Service { sqlState, severity, exceptions); } } + + /** + * Request for {@link Service#apply(SyncResultsRequest)} + */ + class SyncResultsRequest extends Request { + public final String connectionId; + public final int statementId; + public final QueryState state; + public final long offset; + + SyncResultsRequest() { + this.connectionId = null; + this.statementId = 0; + this.state = null; + this.offset = 0L; + } + + public SyncResultsRequest(@JsonProperty("connectionId") String connectionId, + @JsonProperty("statementId") int statementId, @JsonProperty("state") QueryState state, + @JsonProperty("offset") long offset) { + this.connectionId = connectionId; + this.statementId = statementId; + this.state = state; + this.offset = offset; + } + + SyncResultsResponse accept(Service service) { + return service.apply(this); + } + + Request deserialize(Message genericMsg) { + if (!(genericMsg instanceof Requests.SyncResultsRequest)) { + throw new IllegalArgumentException( + "Expected SyncResultsRequest, but got " + genericMsg.getClass().getName()); + } + + Requests.SyncResultsRequest msg = (Requests.SyncResultsRequest) genericMsg; + + return new SyncResultsRequest(msg.getConnectionId(), msg.getStatementId(), + QueryState.fromProto(msg.getState()), msg.getOffset()); + } + + Requests.SyncResultsRequest serialize() { + Requests.SyncResultsRequest.Builder builder = Requests.SyncResultsRequest.newBuilder(); + + if (null != connectionId) { + builder.setConnectionId(connectionId); + } + + if (null != state) { + builder.setState(state.toProto()); + } + + builder.setStatementId(statementId); + builder.setOffset(offset); + + return builder.build(); + } + + @Override public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((connectionId == null) ? 0 : connectionId.hashCode()); + result = prime * result + (int) (offset ^ (offset >>> 32)); + result = prime * result + ((state == null) ? 0 : state.hashCode()); + result = prime * result + statementId; + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (null == obj || !(obj instanceof SyncResultsRequest)) { + return false; + } + + SyncResultsRequest other = (SyncResultsRequest) obj; + + if (connectionId == null) { + if (other.connectionId != null) { + return false; + } + } else if (!connectionId.equals(other.connectionId)) { + return false; + } + + if (offset != other.offset) { + return false; + } + + if (state == null) { + if (other.state != null) { + return false; + } + } else if (!state.equals(other.state)) { + return false; + } + + if (statementId != other.statementId) { + return false; + } + + return true; + } + } + + /** + * Response for {@link Service#apply(SyncResultsRequest)}. + */ + class SyncResultsResponse extends Response { + public boolean missingStatement = false; + public final boolean moreResults; + + SyncResultsResponse() { + this.moreResults = false; + } + + public SyncResultsResponse(@JsonProperty("moreResults") boolean moreResults, + @JsonProperty("missingStatement") boolean missingStatement) { + this.moreResults = moreResults; + this.missingStatement = missingStatement; + } + + SyncResultsResponse deserialize(Message genericMsg) { + if (!(genericMsg instanceof Responses.SyncResultsResponse)) { + throw new IllegalArgumentException( + "Expected SyncResultsResponse, but got " + genericMsg.getClass().getName()); + } + + Responses.SyncResultsResponse msg = (Responses.SyncResultsResponse) genericMsg; + + return new SyncResultsResponse(msg.getMoreResults(), msg.getMissingStatement()); + } + + Responses.SyncResultsResponse serialize() { + Responses.SyncResultsResponse.Builder builder = Responses.SyncResultsResponse.newBuilder(); + + return builder.setMoreResults(moreResults).setMissingStatement(missingStatement).build(); + } + + @Override public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (missingStatement ? 1231 : 1237); + result = prime * result + (moreResults ? 1231 : 1237); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || !(obj instanceof SyncResultsResponse)) { + return false; + } + + SyncResultsResponse other = (SyncResultsResponse) obj; + + return missingStatement == other.missingStatement && moreResults == other.moreResults; + } + } } // End Service.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/common.proto ---------------------------------------------------------------------- diff --git a/avatica/src/main/protobuf/common.proto b/avatica/src/main/protobuf/common.proto index f4de1f9..5421ffc 100644 --- a/avatica/src/main/protobuf/common.proto +++ b/avatica/src/main/protobuf/common.proto @@ -206,3 +206,67 @@ enum Severity { ERROR_SEVERITY = 2; WARNING_SEVERITY = 3; } + +// Enumeration corresponding to DatabaseMetaData operations +enum MetaDataOperation { + GET_ATTRIBUTES = 0; + GET_BEST_ROW_IDENTIFIER = 1; + GET_CATALOGS = 2; + GET_CLIENT_INFO_PROPERTIES = 3; + GET_COLUMN_PRIVILEGES = 4; + GET_COLUMNS = 5; + GET_CROSS_REFERENCE = 6; + GET_EXPORTED_KEYS = 7; + GET_FUNCTION_COLUMNS = 8; + GET_FUNCTIONS = 9; + GET_IMPORTED_KEYS = 10; + GET_INDEX_INFO = 11; + GET_PRIMARY_KEYS = 12; + GET_PROCEDURE_COLUMNS = 13; + GET_PROCEDURES = 14; + GET_PSEUDO_COLUMNS = 15; + GET_SCHEMAS = 16; + GET_SCHEMAS_WITH_ARGS = 17; + GET_SUPER_TABLES = 18; + GET_SUPER_TYPES = 19; + GET_TABLE_PRIVILEGES = 20; + GET_TABLES = 21; + GET_TABLE_TYPES = 22; + GET_TYPE_INFO = 23; + GET_UDTS = 24; + GET_VERSION_COLUMNS = 25; +} + +// Represents the breadth of arguments to DatabaseMetaData functions +message MetaDataOperationArgument { + enum ArgumentType { + STRING = 0; + BOOL = 1; + INT = 2; + REPEATED_STRING = 3; + REPEATED_INT = 4; + NULL = 5; + } + + string string_value = 1; + bool bool_value = 2; + sint32 int_value = 3; + repeated string string_array_values = 4; + repeated sint32 int_array_values = 5; + ArgumentType type = 6; +} + +enum StateType { + SQL = 0; + METADATA = 1; +} + +message QueryState { + StateType type = 1; + string sql = 2; + MetaDataOperation op = 3; + repeated MetaDataOperationArgument args = 4; + bool has_args = 5; + bool has_sql = 6; + bool has_op = 7; +} http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/requests.proto ---------------------------------------------------------------------- diff --git a/avatica/src/main/protobuf/requests.proto b/avatica/src/main/protobuf/requests.proto index 02451da..4201e3b 100644 --- a/avatica/src/main/protobuf/requests.proto +++ b/avatica/src/main/protobuf/requests.proto @@ -127,3 +127,10 @@ message ExecuteRequest { } +message SyncResultsRequest { + string connection_id = 1; + uint32 statement_id = 2; + QueryState state = 3; + uint64 offset = 4; +} + http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/responses.proto ---------------------------------------------------------------------- diff --git a/avatica/src/main/protobuf/responses.proto b/avatica/src/main/protobuf/responses.proto index 7c52be7..a899513 100644 --- a/avatica/src/main/protobuf/responses.proto +++ b/avatica/src/main/protobuf/responses.proto @@ -34,6 +34,7 @@ message ResultSetResponse { // Response to PrepareAndExecuteRequest message ExecuteResponse { repeated ResultSetResponse results = 1; + bool missing_statement = 2; // Did the request fail because of no-cached statement } // Response to PrepareRequest @@ -44,6 +45,8 @@ message PrepareResponse { // Response to FetchRequest message FetchResponse { Frame frame = 1; + bool missing_statement = 2; // Did the request fail because of no-cached statement + bool missing_results = 3; // Did the request fail because of a cached-statement w/o ResultSet } // Response to CreateStatementRequest @@ -90,3 +93,8 @@ message ErrorResponse { uint32 error_code = 4; // numeric identifier for error string sql_state = 5; // five-character standard-defined value } + +message SyncResultsResponse { + bool missing_statement = 1; // Server doesn't have the statement with the ID from the request + bool more_results = 2; // Should the client fetch() to get more results +} http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java b/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java new file mode 100644 index 0000000..a1414c3 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Properties; + +/** + * Tests for AvaticaConnection + */ +public class AvaticaConnectionTest { + + @Test + public void testNumExecuteRetries() { + AvaticaConnection statement = Mockito.mock(AvaticaConnection.class); + + Mockito.when(statement.getNumStatementRetries(Mockito.any(Properties.class))) + .thenCallRealMethod(); + + // Bad argument should throw an exception + try { + statement.getNumStatementRetries(null); + Assert.fail("Calling getNumStatementRetries with a null object should throw an exception"); + } catch (NullPointerException e) { + // Pass + } + + Properties props = new Properties(); + + // Verify the default value + Assert.assertEquals(Long.valueOf(AvaticaConnection.NUM_EXECUTE_RETRIES_DEFAULT).longValue(), + statement.getNumStatementRetries(props)); + + // Set a non-default value + props.setProperty(AvaticaConnection.NUM_EXECUTE_RETRIES_KEY, "10"); + + // Verify that we observe that value + Assert.assertEquals(10, statement.getNumStatementRetries(props)); + } + +} + +// End AvaticaConnectionTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java b/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java new file mode 100644 index 0000000..d97bfa2 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.apache.calcite.avatica.remote.MetaDataOperation; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; + +/** + * Tests that {@link QueryState} properly retains the necessary state to recreate + * a {@link ResultSet}. + */ +public class QueryStateTest { + + private Connection conn; + private DatabaseMetaData metadata; + private Statement statement; + + + @Before + public void setup() throws Exception { + conn = Mockito.mock(Connection.class); + metadata = Mockito.mock(DatabaseMetaData.class); + statement = Mockito.mock(Statement.class); + + Mockito.when(conn.getMetaData()).thenReturn(metadata); + } + + @Test + public void testMetadataGetAttributes() throws Exception { + final String catalog = "catalog"; + final String schemaPattern = null; + final String typeNamePattern = "%"; + final String attributeNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_ATTRIBUTES, catalog, schemaPattern, + typeNamePattern, attributeNamePattern); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getAttributes(catalog, schemaPattern, typeNamePattern, + attributeNamePattern); + } + + @Test + public void testMetadataGetBestRowIdentifier() throws Exception { + final String catalog = "catalog"; + final String schema = null; + final String table = "table"; + final int scope = 1; + final boolean nullable = true; + + QueryState state = new QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, new Object[] { + catalog, + schema, + table, + scope, + nullable + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getBestRowIdentifier(catalog, schema, table, scope, nullable); + } + + @Test + public void testMetadataGetCatalogs() throws Exception { + QueryState state = new QueryState(MetaDataOperation.GET_CATALOGS, new Object[0]); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getCatalogs(); + } + + @Test + public void testMetadataGetColumnPrivileges() throws Exception { + final String catalog = null; + final String schema = "schema"; + final String table = "table"; + final String columnNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_COLUMN_PRIVILEGES, new Object[] { + catalog, + schema, + table, + columnNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getColumnPrivileges(catalog, schema, table, columnNamePattern); + } + + @Test + public void testMetadataGetColumns() throws Exception { + final String catalog = null; + final String schemaPattern = "%"; + final String tableNamePattern = "%"; + final String columnNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_COLUMNS, new Object[] { + catalog, + schemaPattern, + tableNamePattern, + columnNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getColumns(catalog, schemaPattern, tableNamePattern, + columnNamePattern); + } + + @Test + public void testMetadataGetCrossReference() throws Exception { + final String parentCatalog = null; + final String parentSchema = null; + final String parentTable = "%"; + final String foreignCatalog = null; + final String foreignSchema = null; + final String foreignTable = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_CROSS_REFERENCE, new Object[] { + parentCatalog, + parentSchema, + parentTable, + foreignCatalog, + foreignSchema, + foreignTable + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getCrossReference(parentCatalog, parentSchema, parentTable, + foreignCatalog, foreignSchema, foreignTable); + } + + @Test + public void testMetadataGetExportedKeys() throws Exception { + final String catalog = ""; + final String schema = null; + final String table = "mytable"; + + QueryState state = new QueryState(MetaDataOperation.GET_EXPORTED_KEYS, new Object[] { + catalog, + schema, + table + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getExportedKeys(catalog, schema, table); + } + + @Test + public void testMetadataGetFunctionColumns() throws Exception { + final String catalog = null; + final String schemaPattern = "%"; + final String functionNamePattern = "%"; + final String columnNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_FUNCTION_COLUMNS, new Object[] { + catalog, + schemaPattern, + functionNamePattern, + columnNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getFunctionColumns(catalog, schemaPattern, functionNamePattern, + columnNamePattern); + } + + @Test + public void testMetadataGetFunctions() throws Exception { + final String catalog = null; + final String schemaPattern = "%"; + final String functionNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_FUNCTIONS, new Object[] { + catalog, + schemaPattern, + functionNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getFunctions(catalog, schemaPattern, functionNamePattern); + } + + @Test + public void testMetadataGetImportedKeys() throws Exception { + final String catalog = ""; + final String schema = null; + final String table = "my_table"; + + QueryState state = new QueryState(MetaDataOperation.GET_IMPORTED_KEYS, new Object[] { + catalog, + schema, + table + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getImportedKeys(catalog, schema, table); + } + + @Test + public void testMetadataGetIndexInfo() throws Exception { + final String catalog = ""; + final String schema = null; + final String table = "my_table"; + final boolean unique = true; + final boolean approximate = true; + + QueryState state = new QueryState(MetaDataOperation.GET_INDEX_INFO, new Object[] { + catalog, + schema, + table, + unique, + approximate + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getIndexInfo(catalog, schema, table, unique, approximate); + } + + @Test + public void testMetadataGetPrimaryKeys() throws Exception { + final String catalog = ""; + final String schema = null; + final String table = "my_table"; + + QueryState state = new QueryState(MetaDataOperation.GET_PRIMARY_KEYS, new Object[] { + catalog, + schema, + table + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getPrimaryKeys(catalog, schema, table); + } + + @Test + public void testMetadataGetProcedureColumns() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String procedureNamePattern = "%"; + final String columnNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_PROCEDURE_COLUMNS, new Object[] { + catalog, + schemaPattern, + procedureNamePattern, + columnNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getProcedureColumns(catalog, schemaPattern, procedureNamePattern, + columnNamePattern); + } + + @Test + public void testMetadataGetProcedures() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String procedureNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_PROCEDURES, new Object[] { + catalog, + schemaPattern, + procedureNamePattern, + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getProcedures(catalog, schemaPattern, procedureNamePattern); + } + + @Test + public void testMetadataGetPseudoColumns() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String tableNamePattern = "%"; + final String columnNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_PSEUDO_COLUMNS, new Object[] { + catalog, + schemaPattern, + tableNamePattern, + columnNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getPseudoColumns(catalog, schemaPattern, tableNamePattern, + columnNamePattern); + } + + @Test + public void testMetadataGetSchemas() throws Exception { + QueryState state = new QueryState(MetaDataOperation.GET_SCHEMAS, new Object[0]); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getSchemas(); + } + + @Test + public void testMetadataGetSchemasWithArgs() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + + QueryState state = new QueryState(MetaDataOperation.GET_SCHEMAS_WITH_ARGS, new Object[] { + catalog, + schemaPattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getSchemas(catalog, schemaPattern); + } + + @Test + public void testMetadataGetSuperTables() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String tableNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_SUPER_TABLES, new Object[] { + catalog, + schemaPattern, + tableNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getSuperTables(catalog, schemaPattern, tableNamePattern); + } + + @Test + public void testMetadataGetSuperTypes() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String tableNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_SUPER_TYPES, new Object[] { + catalog, + schemaPattern, + tableNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getSuperTypes(catalog, schemaPattern, tableNamePattern); + } + + @Test + public void testMetadataGetTablePrivileges() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String tableNamePattern = "%"; + + QueryState state = new QueryState(MetaDataOperation.GET_TABLE_PRIVILEGES, new Object[] { + catalog, + schemaPattern, + tableNamePattern + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getTablePrivileges(catalog, schemaPattern, tableNamePattern); + } + + @Test + public void testMetadataGetTables() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String tableNamePattern = "%"; + final String[] types = new String[] {"VIEW", "TABLE"}; + + QueryState state = new QueryState(MetaDataOperation.GET_TABLES, new Object[] { + catalog, + schemaPattern, + tableNamePattern, + types + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getTables(catalog, schemaPattern, tableNamePattern, types); + } + + @Test + public void testMetadataGetTableTypes() throws Exception { + QueryState state = new QueryState(MetaDataOperation.GET_TABLE_TYPES, new Object[0]); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getTableTypes(); + } + + @Test + public void testMetadataGetTypeInfo() throws Exception { + QueryState state = new QueryState(MetaDataOperation.GET_TYPE_INFO, new Object[0]); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getTypeInfo(); + } + + @Test + public void testMetadataGetUDTs() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String typeNamePattern = "%"; + final int[] types = new int[] {1, 2}; + + QueryState state = new QueryState(MetaDataOperation.GET_UDTS, new Object[] { + catalog, + schemaPattern, + typeNamePattern, + types + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getUDTs(catalog, schemaPattern, typeNamePattern, types); + } + + @Test + public void testMetadataGetVersionColumns() throws Exception { + final String catalog = ""; + final String schemaPattern = null; + final String table = "my_table"; + + QueryState state = new QueryState(MetaDataOperation.GET_VERSION_COLUMNS, new Object[] { + catalog, + schemaPattern, + table + }); + + state.invoke(conn, statement); + + Mockito.verify(metadata).getVersionColumns(catalog, schemaPattern, table); + } + + @Test + public void testSerialization() throws Exception { + final String catalog = "catalog"; + final String schema = null; + final String table = "table"; + final int scope = 1; + final boolean nullable = true; + + QueryState state = new QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, new Object[] { + catalog, + schema, + table, + scope, + nullable + }); + + assertEquals(state, QueryState.fromProto(state.toProto())); + + final String schemaPattern = null; + final String typeNamePattern = "%"; + final int[] types = new int[] {1, 2}; + + state = new QueryState(MetaDataOperation.GET_UDTS, new Object[] { + catalog, + schemaPattern, + typeNamePattern, + types + }); + + assertEquals(state, QueryState.fromProto(state.toProto())); + + state = new QueryState("SELECT * FROM foo"); + + assertEquals(state, QueryState.fromProto(state.toProto())); + } + +} + +// End QueryStateTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java new file mode 100644 index 0000000..33369d2 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Tests for the HTTP transport. + */ +public class AvaticaHttpClientTest { + private static final String REQUEST = + "{\"request\":\"createStatement\",\"connectionId\":\"8f3f28ee-d0bb-4cdb-a4b1-8f6e8476c534\"}"; + private static final String RESPONSE = + "{\"response\":\"createStatement\",\"connectionId\":" + + "\"8f3f28ee-d0bb-4cdb-a4b1-8f6e8476c534\",\"statementId\":1608176856}"; + + @Test + public void testRetryOnUnavailable() throws Exception { + // HTTP-503, try again + URL url = new URL("http://127.0.0.1:8765"); + final HttpURLConnection cnxn = Mockito.mock(HttpURLConnection.class); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayInputStream bais = new ByteArrayInputStream(RESPONSE.getBytes(StandardCharsets.UTF_8)); + + // Create the HTTP client + AvaticaHttpClientImpl client = new AvaticaHttpClientImpl(url) { + @Override HttpURLConnection openConnection() throws IOException { + return cnxn; + } + }; + + // HTTP 503 then 200 + Mockito.when(cnxn.getResponseCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE, + HttpURLConnection.HTTP_OK); + + Mockito.when(cnxn.getOutputStream()).thenReturn(baos); + Mockito.when(cnxn.getInputStream()).thenReturn(bais); + + byte[] response = client.send(REQUEST.getBytes(StandardCharsets.UTF_8)); + + assertArrayEquals(RESPONSE.getBytes(StandardCharsets.UTF_8), response); + } + + @Test(expected = RuntimeException.class) + public void testServerError() throws Exception { + // HTTP 500 should error out + URL url = new URL("http://127.0.0.1:8765"); + final HttpURLConnection cnxn = Mockito.mock(HttpURLConnection.class); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Create the HTTP client + AvaticaHttpClientImpl client = new AvaticaHttpClientImpl(url) { + @Override HttpURLConnection openConnection() throws IOException { + return cnxn; + } + }; + + // HTTP 500 + Mockito.when(cnxn.getResponseCode()).thenReturn(HttpURLConnection.HTTP_INTERNAL_ERROR); + + Mockito.when(cnxn.getOutputStream()).thenReturn(baos); + + // Should throw an RTE + client.send(REQUEST.getBytes(StandardCharsets.UTF_8)); + } + +} + +// End AvaticaHttpClientTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java new file mode 100644 index 0000000..c64b32c --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link MetaDataOperation} + */ +public class MetaDataOperationTest { + + @Test + public void testProtobufSerialization() { + for (MetaDataOperation metadataOp : MetaDataOperation.values()) { + assertEquals(metadataOp, MetaDataOperation.fromProto(metadataOp.toProto())); + } + } + +} + +// End MetaDataOperationTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java index f91f597..3c25e13 100644 --- a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java @@ -84,7 +84,7 @@ public class ProtobufHandlerTest { frameRows.add(new Object[] {true, "my_string"}); Meta.Frame frame = Frame.create(0, true, frameRows); - FetchResponse response = new FetchResponse(frame); + FetchResponse response = new FetchResponse(frame, false, false); when(translation.parseRequest(serializedRequest)).thenReturn(request); when(service.apply(request)).thenReturn(response); http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java index 0db48ab..3ca44e9 100644 --- a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java @@ -26,6 +26,7 @@ import org.apache.calcite.avatica.Meta.Frame; import org.apache.calcite.avatica.Meta.Signature; import org.apache.calcite.avatica.Meta.Style; import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.remote.Service.CatalogsRequest; import org.apache.calcite.avatica.remote.Service.CloseConnectionRequest; import org.apache.calcite.avatica.remote.Service.CloseConnectionResponse; @@ -51,6 +52,8 @@ import org.apache.calcite.avatica.remote.Service.Request; import org.apache.calcite.avatica.remote.Service.Response; import org.apache.calcite.avatica.remote.Service.ResultSetResponse; import org.apache.calcite.avatica.remote.Service.SchemasRequest; +import org.apache.calcite.avatica.remote.Service.SyncResultsRequest; +import org.apache.calcite.avatica.remote.Service.SyncResultsResponse; import org.apache.calcite.avatica.remote.Service.TableTypesRequest; import org.apache.calcite.avatica.remote.Service.TablesRequest; import org.apache.calcite.avatica.remote.Service.TypeInfoRequest; @@ -198,9 +201,30 @@ public class ProtobufTranslationImplTest<T> { new ConnectionPropertiesImpl(Boolean.FALSE, Boolean.FALSE, Integer.MAX_VALUE, "catalog", "schema"))); + requests.add(new SyncResultsRequest("connectionId", 12345, getSqlQueryState(), 150)); + requests.add(new SyncResultsRequest("connectionId2", 54321, getMetadataQueryState1(), 0)); + requests.add(new SyncResultsRequest("connectionId3", 5, getMetadataQueryState2(), 10)); + return requests; } + private static QueryState getSqlQueryState() { + return new QueryState("SELECT * from TABLE"); + } + + private static QueryState getMetadataQueryState1() { + return new QueryState(MetaDataOperation.GET_COLUMNS, new Object[] { + "", + null, + "%", + "%" + }); + } + + private static QueryState getMetadataQueryState2() { + return new QueryState(MetaDataOperation.GET_CATALOGS, new Object[0]); + } + private static List<Request> getRequestsWithNulls() { LinkedList<Request> requests = new LinkedList<>(); @@ -269,8 +293,10 @@ public class ProtobufTranslationImplTest<T> { } responses.add(new DatabasePropertyResponse(propertyMap)); - responses.add(new ExecuteResponse(Arrays.asList(results1, results1, results1))); - responses.add(new FetchResponse(frame)); + responses.add(new ExecuteResponse(Arrays.asList(results1, results1, results1), false)); + responses.add(new FetchResponse(frame, false, false)); + responses.add(new FetchResponse(frame, true, true)); + responses.add(new FetchResponse(frame, false, true)); responses.add( new PrepareResponse( new Meta.StatementHandle("connectionId", Integer.MAX_VALUE, @@ -283,6 +309,13 @@ public class ProtobufTranslationImplTest<T> { ErrorResponse.UNKNOWN_ERROR_CODE, ErrorResponse.UNKNOWN_SQL_STATE, AvaticaSeverity.WARNING)); + // No more results, statement not missing + responses.add(new SyncResultsResponse(false, false)); + // Missing statement, no results + responses.add(new SyncResultsResponse(false, true)); + // More results, no missing statement + responses.add(new SyncResultsResponse(true, false)); + return responses; } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java index 18d66e9..dccf889 100644 --- a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java +++ b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java @@ -111,6 +111,10 @@ public class JsonHandlerTest { return null; } + @Override public SyncResultsResponse apply(SyncResultsRequest request) { + return null; + } + @Override public ExecuteResponse apply(ExecuteRequest request) { return null; } @@ -145,7 +149,7 @@ public class JsonHandlerTest { RANDOM.nextInt(), false, signature, Meta.Frame.EMPTY, -1L); return new Service.ExecuteResponse( - Collections.singletonList(resultSetResponse)); + Collections.singletonList(resultSetResponse), false); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java index f44e871..5f677c1 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java @@ -24,6 +24,7 @@ import org.apache.calcite.avatica.AvaticaPreparedStatement; import org.apache.calcite.avatica.AvaticaResultSetMetaData; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.UnregisteredDriver; import java.io.InputStream; @@ -90,7 +91,7 @@ public class CalciteJdbc41Factory extends CalciteFactory { resultSetConcurrency, resultSetHoldability); } - public CalciteResultSet newResultSet(AvaticaStatement statement, + public CalciteResultSet newResultSet(AvaticaStatement statement, QueryState state, Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) { final ResultSetMetaData metaData = newResultSetMetaData(statement, signature); http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java index 73c0880..e4f0ae5 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java @@ -25,6 +25,8 @@ import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; @@ -528,7 +530,13 @@ public class CalciteMetaImpl extends MetaImpl { "TABLE_TYPE"); } - @Override public Iterable<Object> createIterable(StatementHandle handle, + @Override public Iterable<Object> createIterable(StatementHandle handle, QueryState state, + Signature signature, List<TypedValue> parameterValues, Frame firstFrame) { + // Drop QueryState + return _createIterable(handle, signature, parameterValues, firstFrame); + } + + Iterable<Object> _createIterable(StatementHandle handle, Signature signature, List<TypedValue> parameterValues, Frame firstFrame) { try { //noinspection unchecked @@ -584,7 +592,7 @@ public class CalciteMetaImpl extends MetaImpl { final Iterator<Object> iterator; if (stmt.getResultSet() == null) { final Iterable<Object> iterable = - createIterable(h, signature, null, null); + _createIterable(h, signature, null, null); iterator = iterable.iterator(); stmt.setResultSet(iterator); } else { @@ -606,7 +614,7 @@ public class CalciteMetaImpl extends MetaImpl { final Iterator<Object> iterator; final Iterable<Object> iterable = - createIterable(h, signature, parameterValues, null); + _createIterable(h, signature, parameterValues, null); iterator = iterable.iterator(); stmt.setResultSet(iterator); @@ -644,6 +652,12 @@ public class CalciteMetaImpl extends MetaImpl { return DRIVER.connect(schema, typeFactory); } + public boolean syncResults(StatementHandle h, QueryState state, long offset) + throws NoSuchStatementException { + // Doesn't have application in Calcite itself. + throw new UnsupportedOperationException(); + } + /** Metadata describing a Calcite table. */ private static class CalciteMetaTable extends MetaTable { private final Table calciteTable; http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java index be68916..c45b245 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java @@ -46,7 +46,7 @@ public class CalciteResultSet extends AvaticaResultSet { CalcitePrepare.CalciteSignature calciteSignature, ResultSetMetaData resultSetMetaData, TimeZone timeZone, Meta.Frame firstFrame) { - super(statement, calciteSignature, resultSetMetaData, timeZone, firstFrame); + super(statement, null, calciteSignature, resultSetMetaData, timeZone, firstFrame); } @Override protected CalciteResultSet execute() throws SQLException {
