http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java deleted file mode 100644 index e1dd06d..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.proto.Common.WireMessage; -import org.apache.calcite.avatica.proto.Requests.CatalogsRequest; -import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest; -import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest; -import org.apache.calcite.avatica.proto.Requests.ColumnsRequest; -import org.apache.calcite.avatica.proto.Requests.CommitRequest; -import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest; -import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest; -import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest; -import org.apache.calcite.avatica.proto.Requests.ExecuteBatchRequest; -import org.apache.calcite.avatica.proto.Requests.ExecuteRequest; -import org.apache.calcite.avatica.proto.Requests.FetchRequest; -import org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest; -import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteBatchRequest; -import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest; -import org.apache.calcite.avatica.proto.Requests.PrepareRequest; -import org.apache.calcite.avatica.proto.Requests.RollbackRequest; -import org.apache.calcite.avatica.proto.Requests.SchemasRequest; -import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest; -import org.apache.calcite.avatica.proto.Requests.TableTypesRequest; -import org.apache.calcite.avatica.proto.Requests.TablesRequest; -import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest; -import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse; -import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse; -import org.apache.calcite.avatica.proto.Responses.CommitResponse; -import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse; -import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse; -import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse; -import org.apache.calcite.avatica.proto.Responses.ErrorResponse; -import org.apache.calcite.avatica.proto.Responses.ExecuteBatchResponse; -import org.apache.calcite.avatica.proto.Responses.ExecuteResponse; -import org.apache.calcite.avatica.proto.Responses.FetchResponse; -import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse; -import org.apache.calcite.avatica.proto.Responses.PrepareResponse; -import org.apache.calcite.avatica.proto.Responses.ResultSetResponse; -import org.apache.calcite.avatica.proto.Responses.RollbackResponse; -import org.apache.calcite.avatica.proto.Responses.RpcMetadata; -import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse; -import org.apache.calcite.avatica.remote.Service.Request; -import org.apache.calcite.avatica.remote.Service.Response; -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; -import org.apache.calcite.avatica.util.UnsynchronizedBuffer; - -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.Message; -import com.google.protobuf.TextFormat; -import com.google.protobuf.UnsafeByteOperations; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static java.nio.charset.StandardCharsets.UTF_8; - -/** - * Implementation of {@link ProtobufTranslationImpl} that translates - * protobuf requests to POJO requests. - */ -public class ProtobufTranslationImpl implements ProtobufTranslation { - private static final Logger LOG = LoggerFactory.getLogger(ProtobufTranslationImpl.class); - - // Extremely ugly mapping of PB class name into a means to convert it to the POJO - private static final Map<String, RequestTranslator> REQUEST_PARSERS; - private static final Map<String, ResponseTranslator> RESPONSE_PARSERS; - private static final Map<Class<?>, ByteString> MESSAGE_CLASSES; - - static { - Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>(); - reqParsers.put(CatalogsRequest.class.getName(), - new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest())); - reqParsers.put(OpenConnectionRequest.class.getName(), - new RequestTranslator(OpenConnectionRequest.parser(), new Service.OpenConnectionRequest())); - reqParsers.put(CloseConnectionRequest.class.getName(), - new RequestTranslator(CloseConnectionRequest.parser(), - new Service.CloseConnectionRequest())); - reqParsers.put(CloseStatementRequest.class.getName(), - new RequestTranslator(CloseStatementRequest.parser(), new Service.CloseStatementRequest())); - reqParsers.put(ColumnsRequest.class.getName(), - new RequestTranslator(ColumnsRequest.parser(), new Service.ColumnsRequest())); - reqParsers.put(ConnectionSyncRequest.class.getName(), - new RequestTranslator(ConnectionSyncRequest.parser(), new Service.ConnectionSyncRequest())); - reqParsers.put(CreateStatementRequest.class.getName(), - new RequestTranslator(CreateStatementRequest.parser(), - new Service.CreateStatementRequest())); - reqParsers.put(DatabasePropertyRequest.class.getName(), - new RequestTranslator(DatabasePropertyRequest.parser(), - new Service.DatabasePropertyRequest())); - reqParsers.put(FetchRequest.class.getName(), - new RequestTranslator(FetchRequest.parser(), new Service.FetchRequest())); - reqParsers.put(PrepareAndExecuteRequest.class.getName(), - new RequestTranslator(PrepareAndExecuteRequest.parser(), - new Service.PrepareAndExecuteRequest())); - reqParsers.put(PrepareRequest.class.getName(), - new RequestTranslator(PrepareRequest.parser(), new Service.PrepareRequest())); - reqParsers.put(SchemasRequest.class.getName(), - new RequestTranslator(SchemasRequest.parser(), new Service.SchemasRequest())); - reqParsers.put(TablesRequest.class.getName(), - new RequestTranslator(TablesRequest.parser(), new Service.TablesRequest())); - reqParsers.put(TableTypesRequest.class.getName(), - new RequestTranslator(TableTypesRequest.parser(), new Service.TableTypesRequest())); - reqParsers.put(TypeInfoRequest.class.getName(), - new RequestTranslator(TypeInfoRequest.parser(), new Service.TypeInfoRequest())); - reqParsers.put(ExecuteRequest.class.getName(), - new RequestTranslator(ExecuteRequest.parser(), new Service.ExecuteRequest())); - reqParsers.put(SyncResultsRequest.class.getName(), - new RequestTranslator(SyncResultsRequest.parser(), new Service.SyncResultsRequest())); - reqParsers.put(CommitRequest.class.getName(), - new RequestTranslator(CommitRequest.parser(), new Service.CommitRequest())); - reqParsers.put(RollbackRequest.class.getName(), - new RequestTranslator(RollbackRequest.parser(), new Service.RollbackRequest())); - reqParsers.put(PrepareAndExecuteBatchRequest.class.getName(), - new RequestTranslator(PrepareAndExecuteBatchRequest.parser(), - new Service.PrepareAndExecuteBatchRequest())); - reqParsers.put(ExecuteBatchRequest.class.getName(), - new RequestTranslator(ExecuteBatchRequest.parser(), - new Service.ExecuteBatchRequest())); - - REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers); - - Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>(); - respParsers.put(OpenConnectionResponse.class.getName(), - new ResponseTranslator(OpenConnectionResponse.parser(), - new Service.OpenConnectionResponse())); - respParsers.put(CloseConnectionResponse.class.getName(), - new ResponseTranslator(CloseConnectionResponse.parser(), - new Service.CloseConnectionResponse())); - respParsers.put(CloseStatementResponse.class.getName(), - new ResponseTranslator(CloseStatementResponse.parser(), - new Service.CloseStatementResponse())); - respParsers.put(ConnectionSyncResponse.class.getName(), - new ResponseTranslator(ConnectionSyncResponse.parser(), - new Service.ConnectionSyncResponse())); - respParsers.put(CreateStatementResponse.class.getName(), - new ResponseTranslator(CreateStatementResponse.parser(), - new Service.CreateStatementResponse())); - respParsers.put(DatabasePropertyResponse.class.getName(), - new ResponseTranslator(DatabasePropertyResponse.parser(), - new Service.DatabasePropertyResponse())); - respParsers.put(ExecuteResponse.class.getName(), - new ResponseTranslator(ExecuteResponse.parser(), new Service.ExecuteResponse())); - respParsers.put(FetchResponse.class.getName(), - new ResponseTranslator(FetchResponse.parser(), new Service.FetchResponse())); - respParsers.put(PrepareResponse.class.getName(), - new ResponseTranslator(PrepareResponse.parser(), new Service.PrepareResponse())); - respParsers.put(ResultSetResponse.class.getName(), - new ResponseTranslator(ResultSetResponse.parser(), new Service.ResultSetResponse())); - respParsers.put(ErrorResponse.class.getName(), - new ResponseTranslator(ErrorResponse.parser(), new Service.ErrorResponse())); - respParsers.put(SyncResultsResponse.class.getName(), - new ResponseTranslator(SyncResultsResponse.parser(), new Service.SyncResultsResponse())); - respParsers.put(RpcMetadata.class.getName(), - new ResponseTranslator(RpcMetadata.parser(), new RpcMetadataResponse())); - respParsers.put(CommitResponse.class.getName(), - new ResponseTranslator(CommitResponse.parser(), new Service.CommitResponse())); - respParsers.put(RollbackResponse.class.getName(), - new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse())); - respParsers.put(ExecuteBatchResponse.class.getName(), - new ResponseTranslator(ExecuteBatchResponse.parser(), new Service.ExecuteBatchResponse())); - - RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers); - - Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>(); - for (Class<?> msgClz : getAllMessageClasses()) { - messageClassNames.put(msgClz, wrapClassName(msgClz)); - } - MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames); - } - - private static List<Class<?>> getAllMessageClasses() { - List<Class<?>> messageClasses = new ArrayList<>(); - messageClasses.add(CatalogsRequest.class); - messageClasses.add(CloseConnectionRequest.class); - messageClasses.add(CloseStatementRequest.class); - messageClasses.add(ColumnsRequest.class); - messageClasses.add(CommitRequest.class); - messageClasses.add(ConnectionSyncRequest.class); - messageClasses.add(CreateStatementRequest.class); - messageClasses.add(DatabasePropertyRequest.class); - messageClasses.add(ExecuteRequest.class); - messageClasses.add(FetchRequest.class); - messageClasses.add(OpenConnectionRequest.class); - messageClasses.add(PrepareAndExecuteRequest.class); - messageClasses.add(PrepareRequest.class); - messageClasses.add(RollbackRequest.class); - messageClasses.add(SchemasRequest.class); - messageClasses.add(SyncResultsRequest.class); - messageClasses.add(TableTypesRequest.class); - messageClasses.add(TablesRequest.class); - messageClasses.add(TypeInfoRequest.class); - messageClasses.add(PrepareAndExecuteBatchRequest.class); - messageClasses.add(ExecuteBatchRequest.class); - - messageClasses.add(CloseConnectionResponse.class); - messageClasses.add(CloseStatementResponse.class); - messageClasses.add(CommitResponse.class); - messageClasses.add(ConnectionSyncResponse.class); - messageClasses.add(CreateStatementResponse.class); - messageClasses.add(DatabasePropertyResponse.class); - messageClasses.add(ErrorResponse.class); - messageClasses.add(ExecuteResponse.class); - messageClasses.add(FetchResponse.class); - messageClasses.add(OpenConnectionResponse.class); - messageClasses.add(PrepareResponse.class); - messageClasses.add(ResultSetResponse.class); - messageClasses.add(RollbackResponse.class); - messageClasses.add(RpcMetadata.class); - messageClasses.add(SyncResultsResponse.class); - messageClasses.add(ExecuteBatchResponse.class); - - return messageClasses; - } - - private static ByteString wrapClassName(Class<?> clz) { - return UnsafeByteOperations.unsafeWrap(clz.getName().getBytes(UTF_8)); - } - - private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer = - new ThreadLocal<UnsynchronizedBuffer>() { - @Override protected UnsynchronizedBuffer initialValue() { - return new UnsynchronizedBuffer(); - } - }; - - /** - * Fetches the concrete message's Parser implementation. - * - * @param className The protocol buffer class name - * @return The Parser for the class - * @throws IllegalArgumentException If the argument is null or if a Parser for the given - * class name is not found. - */ - public static RequestTranslator getParserForRequest(String className) { - if (null == className || className.isEmpty()) { - throw new IllegalArgumentException("Cannot fetch parser for Request with " - + (null == className ? "null" : "missing") + " class name"); - } - - RequestTranslator translator = REQUEST_PARSERS.get(className); - if (null == translator) { - throw new IllegalArgumentException("Cannot find request parser for " + className); - } - - return translator; - } - - /** - * Fetches the concrete message's Parser implementation. - * - * @param className The protocol buffer class name - * @return The Parser for the class - * @throws IllegalArgumentException If the argument is null or if a Parser for the given - * class name is not found. - */ - public static ResponseTranslator getParserForResponse(String className) { - if (null == className || className.isEmpty()) { - throw new IllegalArgumentException("Cannot fetch parser for Response with " - + (null == className ? "null" : "missing") + " class name"); - } - - ResponseTranslator translator = RESPONSE_PARSERS.get(className); - if (null == translator) { - throw new IllegalArgumentException("Cannot find response parser for " + className); - } - - return translator; - } - - @Override public byte[] serializeResponse(Response response) throws IOException { - // Avoid BAOS for its synchronized write methods, we don't need that concurrency control - UnsynchronizedBuffer out = threadLocalBuffer.get(); - try { - Message responseMsg = response.serialize(); - // Serialization of the response may be large - if (LOG.isTraceEnabled()) { - LOG.trace("Serializing response '{}'", TextFormat.shortDebugString(responseMsg)); - } - serializeMessage(out, responseMsg); - return out.toArray(); - } finally { - out.reset(); - } - } - - @Override public byte[] serializeRequest(Request request) throws IOException { - // Avoid BAOS for its synchronized write methods, we don't need that concurrency control - UnsynchronizedBuffer out = threadLocalBuffer.get(); - try { - Message requestMsg = request.serialize(); - // Serialization of the request may be large - if (LOG.isTraceEnabled()) { - LOG.trace("Serializing request '{}'", TextFormat.shortDebugString(requestMsg)); - } - serializeMessage(out, requestMsg); - return out.toArray(); - } finally { - out.reset(); - } - } - - void serializeMessage(OutputStream out, Message msg) throws IOException { - // Serialize the protobuf message - UnsynchronizedBuffer buffer = threadLocalBuffer.get(); - ByteString serializedMsg; - try { - msg.writeTo(buffer); - // Make a bytestring from it - serializedMsg = UnsafeByteOperations.unsafeWrap(buffer.toArray()); - } finally { - buffer.reset(); - } - - // Wrap the serialized message in a WireMessage - WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass())) - .setWrappedMessage(serializedMsg).build(); - - // Write the WireMessage to the provided OutputStream - wireMsg.writeTo(out); - } - - ByteString getClassNameBytes(Class<?> clz) { - ByteString byteString = MESSAGE_CLASSES.get(clz); - if (null == byteString) { - throw new IllegalArgumentException("Missing ByteString for " + clz.getName()); - } - return byteString; - } - - @Override public Request parseRequest(byte[] bytes) throws IOException { - ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes); - CodedInputStream inputStream = byteString.newCodedInput(); - // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the - // WireMessage. - inputStream.enableAliasing(true); - WireMessage wireMsg = WireMessage.parseFrom(inputStream); - - String serializedMessageClassName = wireMsg.getName(); - - try { - RequestTranslator translator = getParserForRequest(serializedMessageClassName); - - // The ByteString should be logical offsets into the original byte array - return translator.transform(wireMsg.getWrappedMessage()); - } catch (RuntimeException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to parse request message '{}'", TextFormat.shortDebugString(wireMsg)); - } - throw e; - } - } - - @Override public Response parseResponse(byte[] bytes) throws IOException { - ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes); - CodedInputStream inputStream = byteString.newCodedInput(); - // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the - // WireMessage. - inputStream.enableAliasing(true); - WireMessage wireMsg = WireMessage.parseFrom(inputStream); - - String serializedMessageClassName = wireMsg.getName(); - try { - ResponseTranslator translator = getParserForResponse(serializedMessageClassName); - - return translator.transform(wireMsg.getWrappedMessage()); - } catch (RuntimeException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to parse response message '{}'", TextFormat.shortDebugString(wireMsg)); - } - throw e; - } - } -} - -// End ProtobufTranslationImpl.java
http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java deleted file mode 100644 index 75b9d58..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.AvaticaUtils; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.ConnectionPropertiesImpl; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.MetaImpl; -import org.apache.calcite.avatica.MissingResultsException; -import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.avatica.QueryState; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Implementation of {@link org.apache.calcite.avatica.Meta} for the remote - * driver. - */ -class RemoteMeta extends MetaImpl { - final Service service; - final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>(); - private Map<DatabaseProperty, Object> databaseProperties; - - public RemoteMeta(AvaticaConnection connection, Service service) { - super(connection); - this.service = service; - } - - private MetaResultSet toResultSet(Class clazz, - Service.ResultSetResponse response) { - if (response.updateCount != -1) { - return MetaResultSet.count(response.connectionId, response.statementId, - response.updateCount); - } - Signature signature0 = response.signature; - if (signature0 == null) { - final List<ColumnMetaData> columns = - clazz == null - ? Collections.<ColumnMetaData>emptyList() - : fieldMetaData(clazz).columns; - signature0 = Signature.create(columns, - "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY, - Meta.StatementType.SELECT); - } - return MetaResultSet.create(response.connectionId, response.statementId, - response.ownStatement, signature0, response.firstFrame); - } - - @Override public Map<DatabaseProperty, Object> getDatabaseProperties(ConnectionHandle ch) { - synchronized (this) { - // Compute map on first use, and cache - if (databaseProperties == null) { - databaseProperties = - service.apply(new Service.DatabasePropertyRequest(ch.id)).map; - } - return databaseProperties; - } - } - - @Override public StatementHandle createStatement(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<StatementHandle>() { - public StatementHandle call() { - // sync connection state if necessary - connectionSync(ch, new ConnectionPropertiesImpl()); - final Service.CreateStatementResponse response = - service.apply(new Service.CreateStatementRequest(ch.id)); - return new StatementHandle(response.connectionId, response.statementId, null); - } - }); - } - - @Override public void closeStatement(final StatementHandle h) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.CloseStatementResponse response = - service.apply( - new Service.CloseStatementRequest(h.connectionId, h.id)); - return null; - } - }); - } - - @Override public void openConnection(final ConnectionHandle ch, final Map<String, String> info) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.OpenConnectionResponse response = - service.apply(new Service.OpenConnectionRequest(ch.id, info)); - return null; - } - }); - } - - @Override public void closeConnection(final ConnectionHandle ch) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.CloseConnectionResponse response = - service.apply(new Service.CloseConnectionRequest(ch.id)); - propsMap.remove(ch.id); - return null; - } - }); - } - - @Override public ConnectionProperties connectionSync(final ConnectionHandle ch, - final ConnectionProperties connProps) { - return connection.invokeWithRetries( - new CallableWithoutException<ConnectionProperties>() { - public ConnectionProperties call() { - ConnectionPropertiesImpl localProps = propsMap.get(ch.id); - if (localProps == null) { - localProps = new ConnectionPropertiesImpl(); - localProps.setDirty(true); - propsMap.put(ch.id, localProps); - } - - // Only make an RPC if necessary. RPC is necessary when we have local changes that need - // flushed to the server (be sure to introduce any new changes from connProps before - // checking AND when connProps.isEmpty() (meaning, this was a request for a value, not - // overriding a value). Otherwise, accumulate the change locally and return immediately. - if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) { - final Service.ConnectionSyncResponse response = service.apply( - new Service.ConnectionSyncRequest(ch.id, localProps)); - propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps); - return response.connProps; - } else { - return localProps; - } - } - }); - } - - @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.CatalogsRequest(ch.id)); - return toResultSet(MetaCatalog.class, response); - } - }); - } - - @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.SchemasRequest(ch.id, catalog, schemaPattern.s)); - return toResultSet(MetaSchema.class, response); - } - }); - } - - @Override public MetaResultSet getTables(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern, final Pat tableNamePattern, final List<String> typeList) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.TablesRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, typeList)); - return toResultSet(MetaTable.class, response); - } - }); - } - - @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.TableTypesRequest(ch.id)); - return toResultSet(MetaTableType.class, response); - } - }); - } - - @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.TypeInfoRequest(ch.id)); - return toResultSet(MetaTypeInfo.class, response); - } - }); - } - - @Override public MetaResultSet getColumns(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern, final Pat tableNamePattern, final Pat columnNamePattern) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, columnNamePattern.s)); - return toResultSet(MetaColumn.class, response); - } - }); - } - - @Override public StatementHandle prepare(final ConnectionHandle ch, final String sql, - final long maxRowCount) { - return connection.invokeWithRetries( - new CallableWithoutException<StatementHandle>() { - public StatementHandle call() { - connectionSync(ch, - new ConnectionPropertiesImpl()); // sync connection state if necessary - final Service.PrepareResponse response = service.apply( - new Service.PrepareRequest(ch.id, sql, maxRowCount)); - return response.statement; - } - }); - } - - @SuppressWarnings("deprecation") - @Override public ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, - PrepareCallback callback) throws NoSuchStatementException { - // The old semantics were that maxRowCount was also treated as the maximum number of - // elements in the first Frame of results. A value of -1 would also preserve this, but an - // explicit (positive) number is easier to follow, IMO. - return prepareAndExecute(h, sql, maxRowCount, AvaticaUtils.toSaturatedInt(maxRowCount), - callback); - } - - @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql, - final long maxRowCount, int maxRowsInFirstFrame, final PrepareCallback callback) - throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<ExecuteResult>() { - public ExecuteResult call() { - // sync connection state if necessary - connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl()); - final Service.ExecuteResponse response; - try { - synchronized (callback.getMonitor()) { - callback.clear(); - response = service.apply( - new Service.PrepareAndExecuteRequest(h.connectionId, - h.id, sql, maxRowCount)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - if (response.results.size() > 0) { - final Service.ResultSetResponse result = response.results.get(0); - callback.assign(result.signature, result.firstFrame, - result.updateCount); - } - } - callback.execute(); - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); - } - return new ExecuteResult(metaResultSets); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public Frame fetch(final StatementHandle h, final long offset, - final int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<Frame>() { - public Frame call() { - final Service.FetchResponse response = - service.apply( - new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - if (response.missingResults) { - throw new RuntimeException(new MissingResultsException(h)); - } - return response.frame; - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } else if (cause instanceof MissingResultsException) { - throw (MissingResultsException) cause; - } - throw e; - } - } - - @SuppressWarnings("deprecation") - @Override public ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, - long maxRowCount) throws NoSuchStatementException { - return execute(h, parameterValues, AvaticaUtils.toSaturatedInt(maxRowCount)); - } - - @Override public ExecuteResult execute(final StatementHandle h, - final List<TypedValue> parameterValues, final int maxRowsInFirstFrame) - throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<ExecuteResult>() { - public ExecuteResult call() { - final Service.ExecuteResponse response = service.apply( - new Service.ExecuteRequest(h, parameterValues, maxRowsInFirstFrame)); - - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); - } - - return new ExecuteResult(metaResultSets); - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public boolean syncResults(final StatementHandle h, final QueryState state, - final long offset) throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<Boolean>() { - public Boolean call() { - final Service.SyncResultsResponse response = - service.apply( - new Service.SyncResultsRequest(h.connectionId, h.id, state, offset)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - return response.moreResults; - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public void commit(final ConnectionHandle ch) { - connection.invokeWithRetries(new CallableWithoutException<Void>() { - public Void call() { - final Service.CommitResponse response = - service.apply(new Service.CommitRequest(ch.id)); - return null; - } - }); - } - - @Override public void rollback(final ConnectionHandle ch) { - connection.invokeWithRetries(new CallableWithoutException<Void>() { - public Void call() { - final Service.RollbackResponse response = - service.apply(new Service.RollbackRequest(ch.id)); - return null; - } - }); - } - - @Override public ExecuteBatchResult prepareAndExecuteBatch(final StatementHandle h, - final List<String> sqlCommands) throws NoSuchStatementException { - return connection.invokeWithRetries(new CallableWithoutException<ExecuteBatchResult>() { - @Override public ExecuteBatchResult call() { - Service.ExecuteBatchResponse response = - service.apply( - new Service.PrepareAndExecuteBatchRequest(h.connectionId, h.id, sqlCommands)); - return new ExecuteBatchResult(response.updateCounts); - } - }); - } - - @Override public ExecuteBatchResult executeBatch(final StatementHandle h, - final List<List<TypedValue>> parameterValues) throws NoSuchStatementException { - return connection.invokeWithRetries(new CallableWithoutException<ExecuteBatchResult>() { - @Override public ExecuteBatchResult call() { - Service.ExecuteBatchResponse response = - service.apply(new Service.ExecuteBatchRequest(h.connectionId, h.id, parameterValues)); - return new ExecuteBatchResult(response.updateCounts); - } - }); - } -} - -// End RemoteMeta.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java deleted file mode 100644 index 0f4cbfb..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - - -/** - * ProtobufService implementation that queries against a remote implementation, using - * protocol buffers as the serialized form. - */ -public class RemoteProtobufService extends ProtobufService { - private static final Logger LOG = LoggerFactory.getLogger(RemoteProtobufService.class); - - private final AvaticaHttpClient client; - private final ProtobufTranslation translation; - - public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation translation) { - this.client = client; - this.translation = translation; - } - - @Override public Response _apply(Request request) { - final Response resp; - byte[] response = null; - try { - response = client.send(translation.serializeRequest(request)); - } catch (IOException e) { - LOG.debug("Failed to execute remote request: {}", request); - // Failed to get a response from the server for the request. - throw new RuntimeException(e); - } - - try { - resp = translation.parseResponse(response); - } catch (IOException e) { - LOG.debug("Failed to deserialize reponse to {}. '{}'", request, - new String(response, StandardCharsets.UTF_8)); - // Not a protobuf that we could parse. - throw new RuntimeException(e); - } - - // The server had an error, throw an Exception for that. - if (resp instanceof ErrorResponse) { - throw ((ErrorResponse) resp).toException(); - } - - return resp; - } -} - -// End RemoteProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java deleted file mode 100644 index d4828b5..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import java.nio.charset.StandardCharsets; - -/** - * Implementation of {@link org.apache.calcite.avatica.remote.Service} - * that translates requests into JSON and sends them to a remote server, - * usually an HTTP server. - */ -public class RemoteService extends JsonService { - private final AvaticaHttpClient client; - - public RemoteService(AvaticaHttpClient client) { - this.client = client; - } - - @Override public String apply(String request) { - byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8)); - return new String(response, StandardCharsets.UTF_8); - } -} - -// End RemoteService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java deleted file mode 100644 index 417c6ed..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -/** - * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request. - */ -public class RequestTranslator { - - private final Parser<? extends Message> parser; - private final Service.Request impl; - - public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) { - this.parser = parser; - this.impl = impl; - } - - public Service.Request transform(ByteString serializedMessage) throws - InvalidProtocolBufferException { - // This should already be an aliased CodedInputStream from the WireMessage parsing. - Message msg = parser.parseFrom(serializedMessage.newCodedInput()); - return impl.deserialize(msg); - } -} - -// End RequestTranslator.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java deleted file mode 100644 index 0311e13..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -/** - * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response. - */ -public class ResponseTranslator { - - private final Parser<? extends Message> parser; - private final Service.Response impl; - - public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) { - this.parser = parser; - this.impl = impl; - } - - public Service.Response transform(ByteString serializedMessage) throws - InvalidProtocolBufferException { - Message msg = parser.parseFrom(serializedMessage); - return impl.deserialize(msg); - } -} - -// End ResponseTranslator.java
