http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java deleted file mode 100644 index 11a6104..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Mock implementation of {@link Service} - * that encodes its requests and responses as JSON - * and looks up responses from a pre-defined map. - */ -public class MockJsonService extends JsonService { - private final Map<String, String> map; - - public MockJsonService(Map<String, String> map) { - this.map = map; - } - - @Override public String apply(String request) { - String response = map.get(request); - if (response == null) { - throw new RuntimeException("No response for " + request); - } - return response; - } - - /** Factory that creates a {@code MockJsonService}. */ - public static class Factory implements Service.Factory { - public Service create(AvaticaConnection connection) { - final String connectionId = connection.id; - final Map<String, String> map1 = new HashMap<>(); - try { - map1.put( - "{\"request\":\"openConnection\",\"connectionId\":\"" + connectionId + "\",\"info\":{}}", - "{\"response\":\"openConnection\"}"); - map1.put( - "{\"request\":\"closeConnection\",\"connectionId\":\"" + connectionId + "\"}", - "{\"response\":\"closeConnection\"}"); - map1.put( - "{\"request\":\"getSchemas\",\"catalog\":null,\"schemaPattern\":{\"s\":null}}", - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - JsonService.encode(new SchemasRequest(connectionId, null, null)), - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - JsonService.encode( - new TablesRequest(connectionId, null, null, null, Arrays.<String>asList())), - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - "{\"request\":\"createStatement\",\"connectionId\":\"" + connectionId + "\"}", - "{\"response\":\"createStatement\",\"id\":0}"); - map1.put( - "{\"request\":\"prepareAndExecute\",\"statementId\":0," - + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", - "{\"response\":\"resultSet\", updateCount: -1, \"signature\": {\n" - + " \"columns\": [\n" - + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" - + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" - + " ], \"cursorFactory\": {\"style\": \"ARRAY\"}\n" - + "}, \"rows\": [[1, \"a\"], [null, \"b\"], [3, \"c\"]]}"); - map1.put( - "{\"request\":\"prepare\",\"statementId\":0," - + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", - "{\"response\":\"prepare\",\"signature\": {\n" - + " \"columns\": [\n" - + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" - + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" - + " ],\n" - + " \"parameters\": [],\n" - + " \"cursorFactory\": {\"style\": \"ARRAY\"}\n" - + "}}"); - map1.put( - "{\"request\":\"getColumns\",\"connectionId\":\"" + connectionId + "\",\"catalog\":null,\"schemaPattern\":null," - + "\"tableNamePattern\":\"my_table\",\"columnNamePattern\":null}", - "{\"response\":\"resultSet\",\"connectionId\":\"00000000-0000-0000-0000-000000000000\",\"statementId\":-1,\"ownStatement\":true," - + "\"signature\":{\"columns\":[" - + "{\"ordinal\":0,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":false," - + "\"displaySize\":40,\"label\":\"TABLE_NAME\",\"columnName\":\"TABLE_NAME\",\"schemaName\":\"\",\"precision\":0,\"scale\":0,\"tableName\":\"SYSTEM.TABLE\"," - + "\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":12,\"name\":\"VARCHAR\",\"rep\":\"STRING\"},\"readOnly\":true,\"writable\":false," - + "\"definitelyWritable\":false,\"columnClassName\":\"java.lang.String\"}," - + "{\"ordinal\":1,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":true," - + "\"displaySize\":40,\"label\":\"ORDINAL_POSITION\",\"columnName\":\"ORDINAL_POSITION\",\"schemaName\":\"\",\"precision\":0,\"scale\":0," - + "\"tableName\":\"SYSTEM.TABLE\",\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":-5,\"name\":\"BIGINT\",\"rep\":\"PRIMITIVE_LONG\"}," - + "\"readOnly\":true,\"writable\":false,\"definitelyWritable\":false,\"columnClassName\":\"java.lang.Long\"}" - + "],\"sql\":null," - + "\"parameters\":[]," - + "\"cursorFactory\":{\"style\":\"LIST\",\"clazz\":null,\"fieldNames\":null},\"statementType\":null}," - + "\"firstFrame\":{\"offset\":0,\"done\":true," - + "\"rows\":[[\"my_table\",10]]" - + "},\"updateCount\":-1}"); - } catch (IOException e) { - throw new RuntimeException(e); - } - return new MockJsonService(map1); - } - } -} - -// End MockJsonService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java deleted file mode 100644 index 2fcb19a..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.MetaImpl; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * A mock implementation of ProtobufService for testing. - * - * <p>It performs no serialization of requests and responses. - */ -public class MockProtobufService extends ProtobufService { - - private final String connectionId; - private final Map<Request, Response> mapping; - - public MockProtobufService(String connectionId) { - this.connectionId = connectionId; - this.mapping = createMapping(); - } - - private Map<Request, Response> createMapping() { - HashMap<Request, Response> mappings = new HashMap<>(); - - // Add in mappings - - mappings.put( - new OpenConnectionRequest(connectionId, new HashMap<String, String>()), - new OpenConnectionResponse()); - - // Get the schema, no.. schema..? - mappings.put( - new SchemasRequest(connectionId, null, null), - // ownStatement=false just to avoid the extra close statement call. - new ResultSetResponse(null, 1, false, null, Meta.Frame.EMPTY, -1, null)); - - // Get the tables, no tables exist - mappings.put(new TablesRequest(connectionId, null, null, null, Collections.<String>emptyList()), - // ownStatement=false just to avoid the extra close statement call. - new ResultSetResponse(null, 150, false, null, Meta.Frame.EMPTY, -1, null)); - - // Create a statement, get back an id - mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1, null)); - - // Prepare and execute a query. Values and schema are returned - mappings.put( - new PrepareAndExecuteRequest(connectionId, 1, - "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1), - new ResultSetResponse("0", 1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("C1", 0, Integer.class), - MetaImpl.columnMetaData("C2", 1, String.class)), - null, null, Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), - Meta.Frame.create(0, true, - Arrays.<Object>asList(new Object[] {1, "a"}, - new Object[] {null, "b"}, new Object[] {3, "c"})), -1, null)); - - // Prepare a query. Schema for results are returned, but no values - mappings.put( - new PrepareRequest(connectionId, - "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1), - new ResultSetResponse("0", 1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("C1", 0, Integer.class), - MetaImpl.columnMetaData("C2", 1, String.class)), - null, Collections.<AvaticaParameter>emptyList(), - Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), - null, -1, null)); - - mappings.put( - new ColumnsRequest(connectionId, null, null, "my_table", null), - new ResultSetResponse("00000000-0000-0000-0000-000000000000", -1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("TABLE_NAME", 0, String.class), - MetaImpl.columnMetaData("ORDINAL_POSITION", 1, Long.class)), null, - Collections.<AvaticaParameter>emptyList(), Meta.CursorFactory.ARRAY, null), - Meta.Frame.create(0, true, - Arrays.<Object>asList(new Object[] {new Object[]{"my_table", 10}})), -1, null)); - - return Collections.unmodifiableMap(mappings); - } - - @Override public Response _apply(Request request) { - if (request instanceof CloseConnectionRequest) { - return new CloseConnectionResponse(); - } - - return dispatch(request); - } - - /** - * Fetches the static response for the given request. - * - * @param request the client's request - * @return the appropriate response - * @throws RuntimeException if no mapping is found for the request - */ - private Response dispatch(Request request) { - Response response = mapping.get(request); - - if (null == response) { - throw new RuntimeException("Had no response mapping for " + request); - } - - return response; - } - - /** - * A factory that instantiates the mock protobuf service. - */ - public static class MockProtobufServiceFactory implements Service.Factory { - @Override public Service create(AvaticaConnection connection) { - return new MockProtobufService(connection.id); - } - } -} - -// End MockProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java deleted file mode 100644 index 89e380e..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.Timer; -import org.apache.calcite.avatica.metrics.Timer.Context; -import org.apache.calcite.avatica.remote.Service.Response; - -import java.io.IOException; - -/** - * Dispatches serialized protocol buffer messages to the provided {@link Service} - * by converting them to the POJO Request. Returns back the serialized protocol - * buffer response. - */ -public class ProtobufHandler extends AbstractHandler<byte[]> { - - private final ProtobufTranslation translation; - private final MetricsSystem metrics; - private final Timer serializationTimer; - - public ProtobufHandler(Service service, ProtobufTranslation translation, MetricsSystem metrics) { - super(service); - this.translation = translation; - this.metrics = metrics; - this.serializationTimer = this.metrics.getTimer( - MetricsHelper.concat(ProtobufHandler.class, HANDLER_SERIALIZATION_METRICS_NAME)); - } - - @Override public HandlerResponse<byte[]> apply(byte[] requestBytes) { - return super.apply(requestBytes); - } - - @Override Service.Request decode(byte[] serializedRequest) throws IOException { - try (final Context ctx = serializationTimer.start()) { - return translation.parseRequest(serializedRequest); - } - } - - @Override byte[] encode(Response response) throws IOException { - try (final Context ctx = serializationTimer.start()) { - return translation.serializeResponse(response); - } - } -} - -// End ProtobufHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java deleted file mode 100644 index 56ba125..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.Message; - -/** - * Service implementation that encodes requests and responses as protocol buffers. - */ -public abstract class ProtobufService extends AbstractService { - - /** - * Derived class should implement this method to transport requests and - * responses to and from the peer service. - */ - public abstract Response _apply(Request request); - - @Override SerializationType getSerializationType() { - return SerializationType.PROTOBUF; - } - - @Override public ResultSetResponse apply(CatalogsRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(SchemasRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TablesRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TableTypesRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TypeInfoRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(ColumnsRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public PrepareResponse apply(PrepareRequest request) { - return finagle((PrepareResponse) _apply(request)); - } - - @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) { - return finagle((ExecuteResponse) _apply(request)); - } - - @Override public FetchResponse apply(FetchRequest request) { - return (FetchResponse) _apply(request); - } - - @Override public CreateStatementResponse apply(CreateStatementRequest request) { - return (CreateStatementResponse) _apply(request); - } - - @Override public CloseStatementResponse apply(CloseStatementRequest request) { - return (CloseStatementResponse) _apply(request); - } - - @Override public OpenConnectionResponse apply(OpenConnectionRequest request) { - return (OpenConnectionResponse) _apply(request); - } - - @Override public CloseConnectionResponse apply(CloseConnectionRequest request) { - return (CloseConnectionResponse) _apply(request); - } - - @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) { - return (ConnectionSyncResponse) _apply(request); - } - - @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) { - return (DatabasePropertyResponse) _apply(request); - } - - @Override public ExecuteResponse apply(ExecuteRequest request) { - return finagle((ExecuteResponse) _apply(request)); - } - - @Override public SyncResultsResponse apply(SyncResultsRequest request) { - return (SyncResultsResponse) _apply(request); - } - - @Override public CommitResponse apply(CommitRequest request) { - return (CommitResponse) _apply(request); - } - - @Override public RollbackResponse apply(RollbackRequest request) { - return (RollbackResponse) _apply(request); - } - - /** - * Checks if the provided {@link Message} is an instance of the Class given by - * <code>expectedType</code>. Throws an IllegalArgumentException if the message is not of the - * expected type, otherwise, it returns the message cast as the expected type. - * - * @param msg A Protocol Buffer message. - * @param expectedType The expected type of the Protocol Buffer message. - * @return The msg cast to the concrete Message type. - * @throws IllegalArgumentException If the type of the message is not the expectedType. - */ - public static <T extends Message> T castProtobufMessage(Message msg, Class<T> expectedType) { - if (!expectedType.isInstance(msg)) { - throw new IllegalArgumentException("Expected instance of " + expectedType.getName() - + ", but got " + msg.getClass().getName()); - } - - return expectedType.cast(msg); - } -} - -// End ProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java deleted file mode 100644 index 7142d59..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.remote.Service.Request; -import org.apache.calcite.avatica.remote.Service.Response; - -import java.io.IOException; - -/** - * Generic interface to support parsing of serialized protocol buffers between client and server. - */ -public interface ProtobufTranslation { - - /** - * Serializes a {@link Response} as a protocol buffer. - * - * @param response The response to serialize - * @throws IOException If there are errors during serialization - */ - byte[] serializeResponse(Response response) throws IOException; - - /** - * Serializes a {@link Request} as a protocol buffer. - * - * @param request The request to serialize - * @throws IOException If there are errors during serialization - */ - byte[] serializeRequest(Request request) throws IOException; - - /** - * Parses a serialized protocol buffer request into a {@link Request}. - * - * @param bytes Serialized protocol buffer request from client - * @return A Request object for the given bytes - * @throws IOException If the protocol buffer cannot be deserialized - */ - Request parseRequest(byte[] bytes) throws IOException; - - /** - * Parses a serialized protocol buffer response into a {@link Response}. - * - * @param bytes Serialized protocol buffer request from server - * @return The Response object for the given bytes - * @throws IOException If the protocol buffer cannot be deserialized - */ - Response parseResponse(byte[] bytes) throws IOException; -} - -// End ProtobufTranslation.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java deleted file mode 100644 index 80d2b22..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.proto.Common.WireMessage; -import org.apache.calcite.avatica.proto.Requests.CatalogsRequest; -import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest; -import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest; -import org.apache.calcite.avatica.proto.Requests.ColumnsRequest; -import org.apache.calcite.avatica.proto.Requests.CommitRequest; -import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest; -import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest; -import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest; -import org.apache.calcite.avatica.proto.Requests.ExecuteRequest; -import org.apache.calcite.avatica.proto.Requests.FetchRequest; -import org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest; -import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest; -import org.apache.calcite.avatica.proto.Requests.PrepareRequest; -import org.apache.calcite.avatica.proto.Requests.RollbackRequest; -import org.apache.calcite.avatica.proto.Requests.SchemasRequest; -import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest; -import org.apache.calcite.avatica.proto.Requests.TableTypesRequest; -import org.apache.calcite.avatica.proto.Requests.TablesRequest; -import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest; -import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse; -import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse; -import org.apache.calcite.avatica.proto.Responses.CommitResponse; -import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse; -import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse; -import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse; -import org.apache.calcite.avatica.proto.Responses.ErrorResponse; -import org.apache.calcite.avatica.proto.Responses.ExecuteResponse; -import org.apache.calcite.avatica.proto.Responses.FetchResponse; -import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse; -import org.apache.calcite.avatica.proto.Responses.PrepareResponse; -import org.apache.calcite.avatica.proto.Responses.ResultSetResponse; -import org.apache.calcite.avatica.proto.Responses.RollbackResponse; -import org.apache.calcite.avatica.proto.Responses.RpcMetadata; -import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse; -import org.apache.calcite.avatica.remote.Service.Request; -import org.apache.calcite.avatica.remote.Service.Response; -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; -import org.apache.calcite.avatica.util.UnsynchronizedBuffer; - -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.HBaseZeroCopyByteString; -import com.google.protobuf.Message; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static java.nio.charset.StandardCharsets.UTF_8; - -/** - * Implementation of {@link ProtobufTranslationImpl} that translates - * protobuf requests to POJO requests. - */ -public class ProtobufTranslationImpl implements ProtobufTranslation { - - // Extremely ugly mapping of PB class name into a means to convert it to the POJO - private static final Map<String, RequestTranslator> REQUEST_PARSERS; - private static final Map<String, ResponseTranslator> RESPONSE_PARSERS; - private static final Map<Class<?>, ByteString> MESSAGE_CLASSES; - - static { - Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>(); - reqParsers.put(CatalogsRequest.class.getName(), - new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest())); - reqParsers.put(OpenConnectionRequest.class.getName(), - new RequestTranslator(OpenConnectionRequest.parser(), new Service.OpenConnectionRequest())); - reqParsers.put(CloseConnectionRequest.class.getName(), - new RequestTranslator(CloseConnectionRequest.parser(), - new Service.CloseConnectionRequest())); - reqParsers.put(CloseStatementRequest.class.getName(), - new RequestTranslator(CloseStatementRequest.parser(), new Service.CloseStatementRequest())); - reqParsers.put(ColumnsRequest.class.getName(), - new RequestTranslator(ColumnsRequest.parser(), new Service.ColumnsRequest())); - reqParsers.put(ConnectionSyncRequest.class.getName(), - new RequestTranslator(ConnectionSyncRequest.parser(), new Service.ConnectionSyncRequest())); - reqParsers.put(CreateStatementRequest.class.getName(), - new RequestTranslator(CreateStatementRequest.parser(), - new Service.CreateStatementRequest())); - reqParsers.put(DatabasePropertyRequest.class.getName(), - new RequestTranslator(DatabasePropertyRequest.parser(), - new Service.DatabasePropertyRequest())); - reqParsers.put(FetchRequest.class.getName(), - new RequestTranslator(FetchRequest.parser(), new Service.FetchRequest())); - reqParsers.put(PrepareAndExecuteRequest.class.getName(), - new RequestTranslator(PrepareAndExecuteRequest.parser(), - new Service.PrepareAndExecuteRequest())); - reqParsers.put(PrepareRequest.class.getName(), - new RequestTranslator(PrepareRequest.parser(), new Service.PrepareRequest())); - reqParsers.put(SchemasRequest.class.getName(), - new RequestTranslator(SchemasRequest.parser(), new Service.SchemasRequest())); - reqParsers.put(TablesRequest.class.getName(), - new RequestTranslator(TablesRequest.parser(), new Service.TablesRequest())); - reqParsers.put(TableTypesRequest.class.getName(), - new RequestTranslator(TableTypesRequest.parser(), new Service.TableTypesRequest())); - reqParsers.put(TypeInfoRequest.class.getName(), - new RequestTranslator(TypeInfoRequest.parser(), new Service.TypeInfoRequest())); - reqParsers.put(ExecuteRequest.class.getName(), - new RequestTranslator(ExecuteRequest.parser(), new Service.ExecuteRequest())); - reqParsers.put(SyncResultsRequest.class.getName(), - new RequestTranslator(SyncResultsRequest.parser(), new Service.SyncResultsRequest())); - reqParsers.put(CommitRequest.class.getName(), - new RequestTranslator(CommitRequest.parser(), new Service.CommitRequest())); - reqParsers.put(RollbackRequest.class.getName(), - new RequestTranslator(RollbackRequest.parser(), new Service.RollbackRequest())); - - REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers); - - Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>(); - respParsers.put(OpenConnectionResponse.class.getName(), - new ResponseTranslator(OpenConnectionResponse.parser(), - new Service.OpenConnectionResponse())); - respParsers.put(CloseConnectionResponse.class.getName(), - new ResponseTranslator(CloseConnectionResponse.parser(), - new Service.CloseConnectionResponse())); - respParsers.put(CloseStatementResponse.class.getName(), - new ResponseTranslator(CloseStatementResponse.parser(), - new Service.CloseStatementResponse())); - respParsers.put(ConnectionSyncResponse.class.getName(), - new ResponseTranslator(ConnectionSyncResponse.parser(), - new Service.ConnectionSyncResponse())); - respParsers.put(CreateStatementResponse.class.getName(), - new ResponseTranslator(CreateStatementResponse.parser(), - new Service.CreateStatementResponse())); - respParsers.put(DatabasePropertyResponse.class.getName(), - new ResponseTranslator(DatabasePropertyResponse.parser(), - new Service.DatabasePropertyResponse())); - respParsers.put(ExecuteResponse.class.getName(), - new ResponseTranslator(ExecuteResponse.parser(), new Service.ExecuteResponse())); - respParsers.put(FetchResponse.class.getName(), - new ResponseTranslator(FetchResponse.parser(), new Service.FetchResponse())); - respParsers.put(PrepareResponse.class.getName(), - new ResponseTranslator(PrepareResponse.parser(), new Service.PrepareResponse())); - respParsers.put(ResultSetResponse.class.getName(), - new ResponseTranslator(ResultSetResponse.parser(), new Service.ResultSetResponse())); - respParsers.put(ErrorResponse.class.getName(), - new ResponseTranslator(ErrorResponse.parser(), new Service.ErrorResponse())); - respParsers.put(SyncResultsResponse.class.getName(), - new ResponseTranslator(SyncResultsResponse.parser(), new Service.SyncResultsResponse())); - respParsers.put(RpcMetadata.class.getName(), - new ResponseTranslator(RpcMetadata.parser(), new RpcMetadataResponse())); - respParsers.put(CommitResponse.class.getName(), - new ResponseTranslator(CommitResponse.parser(), new Service.CommitResponse())); - respParsers.put(RollbackResponse.class.getName(), - new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse())); - - RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers); - - Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>(); - for (Class<?> msgClz : getAllMessageClasses()) { - messageClassNames.put(msgClz, wrapClassName(msgClz)); - } - MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames); - } - - private static List<Class<?>> getAllMessageClasses() { - List<Class<?>> messageClasses = new ArrayList<>(); - messageClasses.add(CatalogsRequest.class); - messageClasses.add(CloseConnectionRequest.class); - messageClasses.add(CloseStatementRequest.class); - messageClasses.add(ColumnsRequest.class); - messageClasses.add(CommitRequest.class); - messageClasses.add(ConnectionSyncRequest.class); - messageClasses.add(CreateStatementRequest.class); - messageClasses.add(DatabasePropertyRequest.class); - messageClasses.add(ExecuteRequest.class); - messageClasses.add(FetchRequest.class); - messageClasses.add(OpenConnectionRequest.class); - messageClasses.add(PrepareAndExecuteRequest.class); - messageClasses.add(PrepareRequest.class); - messageClasses.add(RollbackRequest.class); - messageClasses.add(SchemasRequest.class); - messageClasses.add(SyncResultsRequest.class); - messageClasses.add(TableTypesRequest.class); - messageClasses.add(TablesRequest.class); - messageClasses.add(TypeInfoRequest.class); - messageClasses.add(CloseConnectionResponse.class); - messageClasses.add(CloseStatementResponse.class); - messageClasses.add(CommitResponse.class); - messageClasses.add(ConnectionSyncResponse.class); - messageClasses.add(CreateStatementResponse.class); - messageClasses.add(DatabasePropertyResponse.class); - messageClasses.add(ErrorResponse.class); - messageClasses.add(ExecuteResponse.class); - messageClasses.add(FetchResponse.class); - messageClasses.add(OpenConnectionResponse.class); - messageClasses.add(PrepareResponse.class); - messageClasses.add(ResultSetResponse.class); - messageClasses.add(RollbackResponse.class); - messageClasses.add(RpcMetadata.class); - messageClasses.add(SyncResultsResponse.class); - - return messageClasses; - } - - private static ByteString wrapClassName(Class<?> clz) { - return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8)); - } - - private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer = - new ThreadLocal<UnsynchronizedBuffer>() { - @Override protected UnsynchronizedBuffer initialValue() { - return new UnsynchronizedBuffer(); - } - }; - - /** - * Fetches the concrete message's Parser implementation. - * - * @param className The protocol buffer class name - * @return The Parser for the class - * @throws IllegalArgumentException If the argument is null or if a Parser for the given - * class name is not found. - */ - public static RequestTranslator getParserForRequest(String className) { - if (null == className) { - throw new IllegalArgumentException("Cannot fetch parser for null class name"); - } - - RequestTranslator translator = REQUEST_PARSERS.get(className); - if (null == translator) { - throw new IllegalArgumentException("Cannot find parser for " + className); - } - - return translator; - } - - /** - * Fetches the concrete message's Parser implementation. - * - * @param className The protocol buffer class name - * @return The Parser for the class - * @throws IllegalArgumentException If the argument is null or if a Parser for the given - * class name is not found. - */ - public static ResponseTranslator getParserForResponse(String className) { - if (null == className) { - throw new IllegalArgumentException("Cannot fetch parser for null class name"); - } - - ResponseTranslator translator = RESPONSE_PARSERS.get(className); - if (null == translator) { - throw new IllegalArgumentException("Cannot find parser for " + className); - } - - return translator; - } - - @Override public byte[] serializeResponse(Response response) throws IOException { - // Avoid BAOS for its synchronized write methods, we don't need that concurrency control - UnsynchronizedBuffer out = threadLocalBuffer.get(); - try { - Message responseMsg = response.serialize(); - serializeMessage(out, responseMsg); - return out.toArray(); - } finally { - out.reset(); - } - } - - @Override public byte[] serializeRequest(Request request) throws IOException { - // Avoid BAOS for its synchronized write methods, we don't need that concurrency control - UnsynchronizedBuffer out = threadLocalBuffer.get(); - try { - Message requestMsg = request.serialize(); - serializeMessage(out, requestMsg); - return out.toArray(); - } finally { - out.reset(); - } - } - - void serializeMessage(OutputStream out, Message msg) throws IOException { - // Serialize the protobuf message - UnsynchronizedBuffer buffer = threadLocalBuffer.get(); - ByteString serializedMsg; - try { - msg.writeTo(buffer); - // Make a bytestring from it - serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray()); - } finally { - buffer.reset(); - } - - // Wrap the serialized message in a WireMessage - WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass())) - .setWrappedMessage(serializedMsg).build(); - - // Write the WireMessage to the provided OutputStream - wireMsg.writeTo(out); - } - - ByteString getClassNameBytes(Class<?> clz) { - ByteString byteString = MESSAGE_CLASSES.get(clz); - if (null == byteString) { - throw new IllegalArgumentException("Missing ByteString for " + clz.getName()); - } - return byteString; - } - - @Override public Request parseRequest(byte[] bytes) throws IOException { - ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); - CodedInputStream inputStream = byteString.newCodedInput(); - // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the - // WireMessage. - inputStream.enableAliasing(true); - WireMessage wireMsg = WireMessage.parseFrom(inputStream); - - String serializedMessageClassName = wireMsg.getName(); - RequestTranslator translator = getParserForRequest(serializedMessageClassName); - - // The ByteString should be logical offsets into the original byte array - return translator.transform(wireMsg.getWrappedMessage()); - } - - @Override public Response parseResponse(byte[] bytes) throws IOException { - ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); - CodedInputStream inputStream = byteString.newCodedInput(); - // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the - // WireMessage. - inputStream.enableAliasing(true); - WireMessage wireMsg = WireMessage.parseFrom(inputStream); - - String serializedMessageClassName = wireMsg.getName(); - ResponseTranslator translator = getParserForResponse(serializedMessageClassName); - - return translator.transform(wireMsg.getWrappedMessage()); - } -} - -// End ProtobufTranslationImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java deleted file mode 100644 index 463985a..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.ConnectionPropertiesImpl; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.MetaImpl; -import org.apache.calcite.avatica.MissingResultsException; -import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.avatica.QueryState; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Implementation of {@link org.apache.calcite.avatica.Meta} for the remote - * driver. - */ -class RemoteMeta extends MetaImpl { - final Service service; - final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>(); - private Map<DatabaseProperty, Object> databaseProperties; - - public RemoteMeta(AvaticaConnection connection, Service service) { - super(connection); - this.service = service; - } - - private MetaResultSet toResultSet(Class clazz, - Service.ResultSetResponse response) { - if (response.updateCount != -1) { - return MetaResultSet.count(response.connectionId, response.statementId, - response.updateCount); - } - Signature signature0 = response.signature; - if (signature0 == null) { - final List<ColumnMetaData> columns = - clazz == null - ? Collections.<ColumnMetaData>emptyList() - : fieldMetaData(clazz).columns; - signature0 = Signature.create(columns, - "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY, - Meta.StatementType.SELECT); - } - return MetaResultSet.create(response.connectionId, response.statementId, - response.ownStatement, signature0, response.firstFrame); - } - - @Override public Map<DatabaseProperty, Object> getDatabaseProperties(ConnectionHandle ch) { - synchronized (this) { - // Compute map on first use, and cache - if (databaseProperties == null) { - databaseProperties = - service.apply(new Service.DatabasePropertyRequest(ch.id)).map; - } - return databaseProperties; - } - } - - @Override public StatementHandle createStatement(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<StatementHandle>() { - public StatementHandle call() { - // sync connection state if necessary - connectionSync(ch, new ConnectionPropertiesImpl()); - final Service.CreateStatementResponse response = - service.apply(new Service.CreateStatementRequest(ch.id)); - return new StatementHandle(response.connectionId, response.statementId, null); - } - }); - } - - @Override public void closeStatement(final StatementHandle h) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.CloseStatementResponse response = - service.apply( - new Service.CloseStatementRequest(h.connectionId, h.id)); - return null; - } - }); - } - - @Override public void openConnection(final ConnectionHandle ch, final Map<String, String> info) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.OpenConnectionResponse response = - service.apply(new Service.OpenConnectionRequest(ch.id, info)); - return null; - } - }); - } - - @Override public void closeConnection(final ConnectionHandle ch) { - connection.invokeWithRetries( - new CallableWithoutException<Void>() { - public Void call() { - final Service.CloseConnectionResponse response = - service.apply(new Service.CloseConnectionRequest(ch.id)); - propsMap.remove(ch.id); - return null; - } - }); - } - - @Override public ConnectionProperties connectionSync(final ConnectionHandle ch, - final ConnectionProperties connProps) { - return connection.invokeWithRetries( - new CallableWithoutException<ConnectionProperties>() { - public ConnectionProperties call() { - ConnectionPropertiesImpl localProps = propsMap.get(ch.id); - if (localProps == null) { - localProps = new ConnectionPropertiesImpl(); - localProps.setDirty(true); - propsMap.put(ch.id, localProps); - } - - // Only make an RPC if necessary. RPC is necessary when we have local changes that need - // flushed to the server (be sure to introduce any new changes from connProps before - // checking AND when connProps.isEmpty() (meaning, this was a request for a value, not - // overriding a value). Otherwise, accumulate the change locally and return immediately. - if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) { - final Service.ConnectionSyncResponse response = service.apply( - new Service.ConnectionSyncRequest(ch.id, localProps)); - propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps); - return response.connProps; - } else { - return localProps; - } - } - }); - } - - @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.CatalogsRequest(ch.id)); - return toResultSet(MetaCatalog.class, response); - } - }); - } - - @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.SchemasRequest(ch.id, catalog, schemaPattern.s)); - return toResultSet(MetaSchema.class, response); - } - }); - } - - @Override public MetaResultSet getTables(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern, final Pat tableNamePattern, final List<String> typeList) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.TablesRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, typeList)); - return toResultSet(MetaTable.class, response); - } - }); - } - - @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.TableTypesRequest(ch.id)); - return toResultSet(MetaTableType.class, response); - } - }); - } - - @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply(new Service.TypeInfoRequest(ch.id)); - return toResultSet(MetaTypeInfo.class, response); - } - }); - } - - @Override public MetaResultSet getColumns(final ConnectionHandle ch, final String catalog, - final Pat schemaPattern, final Pat tableNamePattern, final Pat columnNamePattern) { - return connection.invokeWithRetries( - new CallableWithoutException<MetaResultSet>() { - public MetaResultSet call() { - final Service.ResultSetResponse response = - service.apply( - new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s, - tableNamePattern.s, columnNamePattern.s)); - return toResultSet(MetaColumn.class, response); - } - }); - } - - @Override public StatementHandle prepare(final ConnectionHandle ch, final String sql, - final long maxRowCount) { - return connection.invokeWithRetries( - new CallableWithoutException<StatementHandle>() { - public StatementHandle call() { - connectionSync(ch, - new ConnectionPropertiesImpl()); // sync connection state if necessary - final Service.PrepareResponse response = service.apply( - new Service.PrepareRequest(ch.id, sql, maxRowCount)); - return response.statement; - } - }); - } - - @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql, - final long maxRowCount, final PrepareCallback callback) throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<ExecuteResult>() { - public ExecuteResult call() { - // sync connection state if necessary - connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl()); - final Service.ExecuteResponse response; - try { - synchronized (callback.getMonitor()) { - callback.clear(); - response = service.apply( - new Service.PrepareAndExecuteRequest(h.connectionId, - h.id, sql, maxRowCount)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - if (response.results.size() > 0) { - final Service.ResultSetResponse result = response.results.get(0); - callback.assign(result.signature, result.firstFrame, - result.updateCount); - } - } - callback.execute(); - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); - } - return new ExecuteResult(metaResultSets); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public Frame fetch(final StatementHandle h, final long offset, - final int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<Frame>() { - public Frame call() { - final Service.FetchResponse response = - service.apply( - new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - if (response.missingResults) { - throw new RuntimeException(new MissingResultsException(h)); - } - return response.frame; - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } else if (cause instanceof MissingResultsException) { - throw (MissingResultsException) cause; - } - throw e; - } - } - - @Override public ExecuteResult execute(final StatementHandle h, - final List<TypedValue> parameterValues, final long maxRowCount) - throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<ExecuteResult>() { - public ExecuteResult call() { - final Service.ExecuteResponse response = service.apply( - new Service.ExecuteRequest(h, parameterValues, maxRowCount)); - - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - - List<MetaResultSet> metaResultSets = new ArrayList<>(); - for (Service.ResultSetResponse result : response.results) { - metaResultSets.add(toResultSet(null, result)); - } - - return new ExecuteResult(metaResultSets); - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public boolean syncResults(final StatementHandle h, final QueryState state, - final long offset) throws NoSuchStatementException { - try { - return connection.invokeWithRetries( - new CallableWithoutException<Boolean>() { - public Boolean call() { - final Service.SyncResultsResponse response = - service.apply( - new Service.SyncResultsRequest(h.connectionId, h.id, state, offset)); - if (response.missingStatement) { - throw new RuntimeException(new NoSuchStatementException(h)); - } - return response.moreResults; - } - }); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoSuchStatementException) { - throw (NoSuchStatementException) cause; - } - throw e; - } - } - - @Override public void commit(final ConnectionHandle ch) { - connection.invokeWithRetries(new CallableWithoutException<Void>() { - public Void call() { - final Service.CommitResponse response = - service.apply(new Service.CommitRequest(ch.id)); - return null; - } - }); - } - - @Override public void rollback(final ConnectionHandle ch) { - connection.invokeWithRetries(new CallableWithoutException<Void>() { - public Void call() { - final Service.RollbackResponse response = - service.apply(new Service.RollbackRequest(ch.id)); - return null; - } - }); - } -} - -// End RemoteMeta.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java deleted file mode 100644 index 828513a..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import java.io.IOException; - -/** - * ProtobufService implementation that queries against a remote implementation, using - * protocol buffers as the serialized form. - */ -public class RemoteProtobufService extends ProtobufService { - private final AvaticaHttpClient client; - private final ProtobufTranslation translation; - - public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation translation) { - this.client = client; - this.translation = translation; - } - - @Override public Response _apply(Request request) { - final Response resp; - try { - byte[] response = client.send(translation.serializeRequest(request)); - resp = translation.parseResponse(response); - } catch (IOException e) { - // Not a protobuf that we could parse. - throw new RuntimeException(e); - } - - // The server had an error, throw an Exception for that. - if (resp instanceof ErrorResponse) { - throw ((ErrorResponse) resp).toException(); - } - - return resp; - } -} - -// End RemoteProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java deleted file mode 100644 index d4828b5..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import java.nio.charset.StandardCharsets; - -/** - * Implementation of {@link org.apache.calcite.avatica.remote.Service} - * that translates requests into JSON and sends them to a remote server, - * usually an HTTP server. - */ -public class RemoteService extends JsonService { - private final AvaticaHttpClient client; - - public RemoteService(AvaticaHttpClient client) { - this.client = client; - } - - @Override public String apply(String request) { - byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8)); - return new String(response, StandardCharsets.UTF_8); - } -} - -// End RemoteService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java deleted file mode 100644 index 417c6ed..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -/** - * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request. - */ -public class RequestTranslator { - - private final Parser<? extends Message> parser; - private final Service.Request impl; - - public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) { - this.parser = parser; - this.impl = impl; - } - - public Service.Request transform(ByteString serializedMessage) throws - InvalidProtocolBufferException { - // This should already be an aliased CodedInputStream from the WireMessage parsing. - Message msg = parser.parseFrom(serializedMessage.newCodedInput()); - return impl.deserialize(msg); - } -} - -// End RequestTranslator.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java deleted file mode 100644 index 0311e13..0000000 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -/** - * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response. - */ -public class ResponseTranslator { - - private final Parser<? extends Message> parser; - private final Service.Response impl; - - public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) { - this.parser = parser; - this.impl = impl; - } - - public Service.Response transform(ByteString serializedMessage) throws - InvalidProtocolBufferException { - Message msg = parser.parseFrom(serializedMessage); - return impl.deserialize(msg); - } -} - -// End ResponseTranslator.java
