http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java new file mode 100644 index 0000000..11a6104 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaConnection; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Mock implementation of {@link Service} + * that encodes its requests and responses as JSON + * and looks up responses from a pre-defined map. + */ +public class MockJsonService extends JsonService { + private final Map<String, String> map; + + public MockJsonService(Map<String, String> map) { + this.map = map; + } + + @Override public String apply(String request) { + String response = map.get(request); + if (response == null) { + throw new RuntimeException("No response for " + request); + } + return response; + } + + /** Factory that creates a {@code MockJsonService}. */ + public static class Factory implements Service.Factory { + public Service create(AvaticaConnection connection) { + final String connectionId = connection.id; + final Map<String, String> map1 = new HashMap<>(); + try { + map1.put( + "{\"request\":\"openConnection\",\"connectionId\":\"" + connectionId + "\",\"info\":{}}", + "{\"response\":\"openConnection\"}"); + map1.put( + "{\"request\":\"closeConnection\",\"connectionId\":\"" + connectionId + "\"}", + "{\"response\":\"closeConnection\"}"); + map1.put( + "{\"request\":\"getSchemas\",\"catalog\":null,\"schemaPattern\":{\"s\":null}}", + "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); + map1.put( + JsonService.encode(new SchemasRequest(connectionId, null, null)), + "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); + map1.put( + JsonService.encode( + new TablesRequest(connectionId, null, null, null, Arrays.<String>asList())), + "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); + map1.put( + "{\"request\":\"createStatement\",\"connectionId\":\"" + connectionId + "\"}", + "{\"response\":\"createStatement\",\"id\":0}"); + map1.put( + "{\"request\":\"prepareAndExecute\",\"statementId\":0," + + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", + "{\"response\":\"resultSet\", updateCount: -1, \"signature\": {\n" + + " \"columns\": [\n" + + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" + + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" + + " ], \"cursorFactory\": {\"style\": \"ARRAY\"}\n" + + "}, \"rows\": [[1, \"a\"], [null, \"b\"], [3, \"c\"]]}"); + map1.put( + "{\"request\":\"prepare\",\"statementId\":0," + + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", + "{\"response\":\"prepare\",\"signature\": {\n" + + " \"columns\": [\n" + + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" + + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" + + " ],\n" + + " \"parameters\": [],\n" + + " \"cursorFactory\": {\"style\": \"ARRAY\"}\n" + + "}}"); + map1.put( + "{\"request\":\"getColumns\",\"connectionId\":\"" + connectionId + "\",\"catalog\":null,\"schemaPattern\":null," + + "\"tableNamePattern\":\"my_table\",\"columnNamePattern\":null}", + "{\"response\":\"resultSet\",\"connectionId\":\"00000000-0000-0000-0000-000000000000\",\"statementId\":-1,\"ownStatement\":true," + + "\"signature\":{\"columns\":[" + + "{\"ordinal\":0,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":false," + + "\"displaySize\":40,\"label\":\"TABLE_NAME\",\"columnName\":\"TABLE_NAME\",\"schemaName\":\"\",\"precision\":0,\"scale\":0,\"tableName\":\"SYSTEM.TABLE\"," + + "\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":12,\"name\":\"VARCHAR\",\"rep\":\"STRING\"},\"readOnly\":true,\"writable\":false," + + "\"definitelyWritable\":false,\"columnClassName\":\"java.lang.String\"}," + + "{\"ordinal\":1,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":true," + + "\"displaySize\":40,\"label\":\"ORDINAL_POSITION\",\"columnName\":\"ORDINAL_POSITION\",\"schemaName\":\"\",\"precision\":0,\"scale\":0," + + "\"tableName\":\"SYSTEM.TABLE\",\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":-5,\"name\":\"BIGINT\",\"rep\":\"PRIMITIVE_LONG\"}," + + "\"readOnly\":true,\"writable\":false,\"definitelyWritable\":false,\"columnClassName\":\"java.lang.Long\"}" + + "],\"sql\":null," + + "\"parameters\":[]," + + "\"cursorFactory\":{\"style\":\"LIST\",\"clazz\":null,\"fieldNames\":null},\"statementType\":null}," + + "\"firstFrame\":{\"offset\":0,\"done\":true," + + "\"rows\":[[\"my_table\",10]]" + + "},\"updateCount\":-1}"); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new MockJsonService(map1); + } + } +} + +// End MockJsonService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java new file mode 100644 index 0000000..2fcb19a --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaParameter; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.MetaImpl; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A mock implementation of ProtobufService for testing. + * + * <p>It performs no serialization of requests and responses. + */ +public class MockProtobufService extends ProtobufService { + + private final String connectionId; + private final Map<Request, Response> mapping; + + public MockProtobufService(String connectionId) { + this.connectionId = connectionId; + this.mapping = createMapping(); + } + + private Map<Request, Response> createMapping() { + HashMap<Request, Response> mappings = new HashMap<>(); + + // Add in mappings + + mappings.put( + new OpenConnectionRequest(connectionId, new HashMap<String, String>()), + new OpenConnectionResponse()); + + // Get the schema, no.. schema..? + mappings.put( + new SchemasRequest(connectionId, null, null), + // ownStatement=false just to avoid the extra close statement call. + new ResultSetResponse(null, 1, false, null, Meta.Frame.EMPTY, -1, null)); + + // Get the tables, no tables exist + mappings.put(new TablesRequest(connectionId, null, null, null, Collections.<String>emptyList()), + // ownStatement=false just to avoid the extra close statement call. + new ResultSetResponse(null, 150, false, null, Meta.Frame.EMPTY, -1, null)); + + // Create a statement, get back an id + mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1, null)); + + // Prepare and execute a query. Values and schema are returned + mappings.put( + new PrepareAndExecuteRequest(connectionId, 1, + "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1), + new ResultSetResponse("0", 1, true, + Meta.Signature.create( + Arrays.<ColumnMetaData>asList( + MetaImpl.columnMetaData("C1", 0, Integer.class), + MetaImpl.columnMetaData("C2", 1, String.class)), + null, null, Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), + Meta.Frame.create(0, true, + Arrays.<Object>asList(new Object[] {1, "a"}, + new Object[] {null, "b"}, new Object[] {3, "c"})), -1, null)); + + // Prepare a query. Schema for results are returned, but no values + mappings.put( + new PrepareRequest(connectionId, + "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1), + new ResultSetResponse("0", 1, true, + Meta.Signature.create( + Arrays.<ColumnMetaData>asList( + MetaImpl.columnMetaData("C1", 0, Integer.class), + MetaImpl.columnMetaData("C2", 1, String.class)), + null, Collections.<AvaticaParameter>emptyList(), + Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), + null, -1, null)); + + mappings.put( + new ColumnsRequest(connectionId, null, null, "my_table", null), + new ResultSetResponse("00000000-0000-0000-0000-000000000000", -1, true, + Meta.Signature.create( + Arrays.<ColumnMetaData>asList( + MetaImpl.columnMetaData("TABLE_NAME", 0, String.class), + MetaImpl.columnMetaData("ORDINAL_POSITION", 1, Long.class)), null, + Collections.<AvaticaParameter>emptyList(), Meta.CursorFactory.ARRAY, null), + Meta.Frame.create(0, true, + Arrays.<Object>asList(new Object[] {new Object[]{"my_table", 10}})), -1, null)); + + return Collections.unmodifiableMap(mappings); + } + + @Override public Response _apply(Request request) { + if (request instanceof CloseConnectionRequest) { + return new CloseConnectionResponse(); + } + + return dispatch(request); + } + + /** + * Fetches the static response for the given request. + * + * @param request the client's request + * @return the appropriate response + * @throws RuntimeException if no mapping is found for the request + */ + private Response dispatch(Request request) { + Response response = mapping.get(request); + + if (null == response) { + throw new RuntimeException("Had no response mapping for " + request); + } + + return response; + } + + /** + * A factory that instantiates the mock protobuf service. + */ + public static class MockProtobufServiceFactory implements Service.Factory { + @Override public Service create(AvaticaConnection connection) { + return new MockProtobufService(connection.id); + } + } +} + +// End MockProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java new file mode 100644 index 0000000..89e380e --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.remote.Service.Response; + +import java.io.IOException; + +/** + * Dispatches serialized protocol buffer messages to the provided {@link Service} + * by converting them to the POJO Request. Returns back the serialized protocol + * buffer response. + */ +public class ProtobufHandler extends AbstractHandler<byte[]> { + + private final ProtobufTranslation translation; + private final MetricsSystem metrics; + private final Timer serializationTimer; + + public ProtobufHandler(Service service, ProtobufTranslation translation, MetricsSystem metrics) { + super(service); + this.translation = translation; + this.metrics = metrics; + this.serializationTimer = this.metrics.getTimer( + MetricsHelper.concat(ProtobufHandler.class, HANDLER_SERIALIZATION_METRICS_NAME)); + } + + @Override public HandlerResponse<byte[]> apply(byte[] requestBytes) { + return super.apply(requestBytes); + } + + @Override Service.Request decode(byte[] serializedRequest) throws IOException { + try (final Context ctx = serializationTimer.start()) { + return translation.parseRequest(serializedRequest); + } + } + + @Override byte[] encode(Response response) throws IOException { + try (final Context ctx = serializationTimer.start()) { + return translation.serializeResponse(response); + } + } +} + +// End ProtobufHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java new file mode 100644 index 0000000..56ba125 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import com.google.protobuf.Message; + +/** + * Service implementation that encodes requests and responses as protocol buffers. + */ +public abstract class ProtobufService extends AbstractService { + + /** + * Derived class should implement this method to transport requests and + * responses to and from the peer service. + */ + public abstract Response _apply(Request request); + + @Override SerializationType getSerializationType() { + return SerializationType.PROTOBUF; + } + + @Override public ResultSetResponse apply(CatalogsRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public ResultSetResponse apply(SchemasRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public ResultSetResponse apply(TablesRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public ResultSetResponse apply(TableTypesRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public ResultSetResponse apply(TypeInfoRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public ResultSetResponse apply(ColumnsRequest request) { + return finagle((ResultSetResponse) _apply(request)); + } + + @Override public PrepareResponse apply(PrepareRequest request) { + return finagle((PrepareResponse) _apply(request)); + } + + @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) { + return finagle((ExecuteResponse) _apply(request)); + } + + @Override public FetchResponse apply(FetchRequest request) { + return (FetchResponse) _apply(request); + } + + @Override public CreateStatementResponse apply(CreateStatementRequest request) { + return (CreateStatementResponse) _apply(request); + } + + @Override public CloseStatementResponse apply(CloseStatementRequest request) { + return (CloseStatementResponse) _apply(request); + } + + @Override public OpenConnectionResponse apply(OpenConnectionRequest request) { + return (OpenConnectionResponse) _apply(request); + } + + @Override public CloseConnectionResponse apply(CloseConnectionRequest request) { + return (CloseConnectionResponse) _apply(request); + } + + @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) { + return (ConnectionSyncResponse) _apply(request); + } + + @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) { + return (DatabasePropertyResponse) _apply(request); + } + + @Override public ExecuteResponse apply(ExecuteRequest request) { + return finagle((ExecuteResponse) _apply(request)); + } + + @Override public SyncResultsResponse apply(SyncResultsRequest request) { + return (SyncResultsResponse) _apply(request); + } + + @Override public CommitResponse apply(CommitRequest request) { + return (CommitResponse) _apply(request); + } + + @Override public RollbackResponse apply(RollbackRequest request) { + return (RollbackResponse) _apply(request); + } + + /** + * Checks if the provided {@link Message} is an instance of the Class given by + * <code>expectedType</code>. Throws an IllegalArgumentException if the message is not of the + * expected type, otherwise, it returns the message cast as the expected type. + * + * @param msg A Protocol Buffer message. + * @param expectedType The expected type of the Protocol Buffer message. + * @return The msg cast to the concrete Message type. + * @throws IllegalArgumentException If the type of the message is not the expectedType. + */ + public static <T extends Message> T castProtobufMessage(Message msg, Class<T> expectedType) { + if (!expectedType.isInstance(msg)) { + throw new IllegalArgumentException("Expected instance of " + expectedType.getName() + + ", but got " + msg.getClass().getName()); + } + + return expectedType.cast(msg); + } +} + +// End ProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java new file mode 100644 index 0000000..7142d59 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.remote.Service.Request; +import org.apache.calcite.avatica.remote.Service.Response; + +import java.io.IOException; + +/** + * Generic interface to support parsing of serialized protocol buffers between client and server. + */ +public interface ProtobufTranslation { + + /** + * Serializes a {@link Response} as a protocol buffer. + * + * @param response The response to serialize + * @throws IOException If there are errors during serialization + */ + byte[] serializeResponse(Response response) throws IOException; + + /** + * Serializes a {@link Request} as a protocol buffer. + * + * @param request The request to serialize + * @throws IOException If there are errors during serialization + */ + byte[] serializeRequest(Request request) throws IOException; + + /** + * Parses a serialized protocol buffer request into a {@link Request}. + * + * @param bytes Serialized protocol buffer request from client + * @return A Request object for the given bytes + * @throws IOException If the protocol buffer cannot be deserialized + */ + Request parseRequest(byte[] bytes) throws IOException; + + /** + * Parses a serialized protocol buffer response into a {@link Response}. + * + * @param bytes Serialized protocol buffer request from server + * @return The Response object for the given bytes + * @throws IOException If the protocol buffer cannot be deserialized + */ + Response parseResponse(byte[] bytes) throws IOException; +} + +// End ProtobufTranslation.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java new file mode 100644 index 0000000..80d2b22 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.proto.Common.WireMessage; +import org.apache.calcite.avatica.proto.Requests.CatalogsRequest; +import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest; +import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest; +import org.apache.calcite.avatica.proto.Requests.ColumnsRequest; +import org.apache.calcite.avatica.proto.Requests.CommitRequest; +import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest; +import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest; +import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest; +import org.apache.calcite.avatica.proto.Requests.ExecuteRequest; +import org.apache.calcite.avatica.proto.Requests.FetchRequest; +import org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest; +import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest; +import org.apache.calcite.avatica.proto.Requests.PrepareRequest; +import org.apache.calcite.avatica.proto.Requests.RollbackRequest; +import org.apache.calcite.avatica.proto.Requests.SchemasRequest; +import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest; +import org.apache.calcite.avatica.proto.Requests.TableTypesRequest; +import org.apache.calcite.avatica.proto.Requests.TablesRequest; +import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest; +import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse; +import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse; +import org.apache.calcite.avatica.proto.Responses.CommitResponse; +import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse; +import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse; +import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse; +import org.apache.calcite.avatica.proto.Responses.ErrorResponse; +import org.apache.calcite.avatica.proto.Responses.ExecuteResponse; +import org.apache.calcite.avatica.proto.Responses.FetchResponse; +import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse; +import org.apache.calcite.avatica.proto.Responses.PrepareResponse; +import org.apache.calcite.avatica.proto.Responses.ResultSetResponse; +import org.apache.calcite.avatica.proto.Responses.RollbackResponse; +import org.apache.calcite.avatica.proto.Responses.RpcMetadata; +import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse; +import org.apache.calcite.avatica.remote.Service.Request; +import org.apache.calcite.avatica.remote.Service.Response; +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.HBaseZeroCopyByteString; +import com.google.protobuf.Message; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Implementation of {@link ProtobufTranslationImpl} that translates + * protobuf requests to POJO requests. + */ +public class ProtobufTranslationImpl implements ProtobufTranslation { + + // Extremely ugly mapping of PB class name into a means to convert it to the POJO + private static final Map<String, RequestTranslator> REQUEST_PARSERS; + private static final Map<String, ResponseTranslator> RESPONSE_PARSERS; + private static final Map<Class<?>, ByteString> MESSAGE_CLASSES; + + static { + Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>(); + reqParsers.put(CatalogsRequest.class.getName(), + new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest())); + reqParsers.put(OpenConnectionRequest.class.getName(), + new RequestTranslator(OpenConnectionRequest.parser(), new Service.OpenConnectionRequest())); + reqParsers.put(CloseConnectionRequest.class.getName(), + new RequestTranslator(CloseConnectionRequest.parser(), + new Service.CloseConnectionRequest())); + reqParsers.put(CloseStatementRequest.class.getName(), + new RequestTranslator(CloseStatementRequest.parser(), new Service.CloseStatementRequest())); + reqParsers.put(ColumnsRequest.class.getName(), + new RequestTranslator(ColumnsRequest.parser(), new Service.ColumnsRequest())); + reqParsers.put(ConnectionSyncRequest.class.getName(), + new RequestTranslator(ConnectionSyncRequest.parser(), new Service.ConnectionSyncRequest())); + reqParsers.put(CreateStatementRequest.class.getName(), + new RequestTranslator(CreateStatementRequest.parser(), + new Service.CreateStatementRequest())); + reqParsers.put(DatabasePropertyRequest.class.getName(), + new RequestTranslator(DatabasePropertyRequest.parser(), + new Service.DatabasePropertyRequest())); + reqParsers.put(FetchRequest.class.getName(), + new RequestTranslator(FetchRequest.parser(), new Service.FetchRequest())); + reqParsers.put(PrepareAndExecuteRequest.class.getName(), + new RequestTranslator(PrepareAndExecuteRequest.parser(), + new Service.PrepareAndExecuteRequest())); + reqParsers.put(PrepareRequest.class.getName(), + new RequestTranslator(PrepareRequest.parser(), new Service.PrepareRequest())); + reqParsers.put(SchemasRequest.class.getName(), + new RequestTranslator(SchemasRequest.parser(), new Service.SchemasRequest())); + reqParsers.put(TablesRequest.class.getName(), + new RequestTranslator(TablesRequest.parser(), new Service.TablesRequest())); + reqParsers.put(TableTypesRequest.class.getName(), + new RequestTranslator(TableTypesRequest.parser(), new Service.TableTypesRequest())); + reqParsers.put(TypeInfoRequest.class.getName(), + new RequestTranslator(TypeInfoRequest.parser(), new Service.TypeInfoRequest())); + reqParsers.put(ExecuteRequest.class.getName(), + new RequestTranslator(ExecuteRequest.parser(), new Service.ExecuteRequest())); + reqParsers.put(SyncResultsRequest.class.getName(), + new RequestTranslator(SyncResultsRequest.parser(), new Service.SyncResultsRequest())); + reqParsers.put(CommitRequest.class.getName(), + new RequestTranslator(CommitRequest.parser(), new Service.CommitRequest())); + reqParsers.put(RollbackRequest.class.getName(), + new RequestTranslator(RollbackRequest.parser(), new Service.RollbackRequest())); + + REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers); + + Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>(); + respParsers.put(OpenConnectionResponse.class.getName(), + new ResponseTranslator(OpenConnectionResponse.parser(), + new Service.OpenConnectionResponse())); + respParsers.put(CloseConnectionResponse.class.getName(), + new ResponseTranslator(CloseConnectionResponse.parser(), + new Service.CloseConnectionResponse())); + respParsers.put(CloseStatementResponse.class.getName(), + new ResponseTranslator(CloseStatementResponse.parser(), + new Service.CloseStatementResponse())); + respParsers.put(ConnectionSyncResponse.class.getName(), + new ResponseTranslator(ConnectionSyncResponse.parser(), + new Service.ConnectionSyncResponse())); + respParsers.put(CreateStatementResponse.class.getName(), + new ResponseTranslator(CreateStatementResponse.parser(), + new Service.CreateStatementResponse())); + respParsers.put(DatabasePropertyResponse.class.getName(), + new ResponseTranslator(DatabasePropertyResponse.parser(), + new Service.DatabasePropertyResponse())); + respParsers.put(ExecuteResponse.class.getName(), + new ResponseTranslator(ExecuteResponse.parser(), new Service.ExecuteResponse())); + respParsers.put(FetchResponse.class.getName(), + new ResponseTranslator(FetchResponse.parser(), new Service.FetchResponse())); + respParsers.put(PrepareResponse.class.getName(), + new ResponseTranslator(PrepareResponse.parser(), new Service.PrepareResponse())); + respParsers.put(ResultSetResponse.class.getName(), + new ResponseTranslator(ResultSetResponse.parser(), new Service.ResultSetResponse())); + respParsers.put(ErrorResponse.class.getName(), + new ResponseTranslator(ErrorResponse.parser(), new Service.ErrorResponse())); + respParsers.put(SyncResultsResponse.class.getName(), + new ResponseTranslator(SyncResultsResponse.parser(), new Service.SyncResultsResponse())); + respParsers.put(RpcMetadata.class.getName(), + new ResponseTranslator(RpcMetadata.parser(), new RpcMetadataResponse())); + respParsers.put(CommitResponse.class.getName(), + new ResponseTranslator(CommitResponse.parser(), new Service.CommitResponse())); + respParsers.put(RollbackResponse.class.getName(), + new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse())); + + RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers); + + Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>(); + for (Class<?> msgClz : getAllMessageClasses()) { + messageClassNames.put(msgClz, wrapClassName(msgClz)); + } + MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames); + } + + private static List<Class<?>> getAllMessageClasses() { + List<Class<?>> messageClasses = new ArrayList<>(); + messageClasses.add(CatalogsRequest.class); + messageClasses.add(CloseConnectionRequest.class); + messageClasses.add(CloseStatementRequest.class); + messageClasses.add(ColumnsRequest.class); + messageClasses.add(CommitRequest.class); + messageClasses.add(ConnectionSyncRequest.class); + messageClasses.add(CreateStatementRequest.class); + messageClasses.add(DatabasePropertyRequest.class); + messageClasses.add(ExecuteRequest.class); + messageClasses.add(FetchRequest.class); + messageClasses.add(OpenConnectionRequest.class); + messageClasses.add(PrepareAndExecuteRequest.class); + messageClasses.add(PrepareRequest.class); + messageClasses.add(RollbackRequest.class); + messageClasses.add(SchemasRequest.class); + messageClasses.add(SyncResultsRequest.class); + messageClasses.add(TableTypesRequest.class); + messageClasses.add(TablesRequest.class); + messageClasses.add(TypeInfoRequest.class); + messageClasses.add(CloseConnectionResponse.class); + messageClasses.add(CloseStatementResponse.class); + messageClasses.add(CommitResponse.class); + messageClasses.add(ConnectionSyncResponse.class); + messageClasses.add(CreateStatementResponse.class); + messageClasses.add(DatabasePropertyResponse.class); + messageClasses.add(ErrorResponse.class); + messageClasses.add(ExecuteResponse.class); + messageClasses.add(FetchResponse.class); + messageClasses.add(OpenConnectionResponse.class); + messageClasses.add(PrepareResponse.class); + messageClasses.add(ResultSetResponse.class); + messageClasses.add(RollbackResponse.class); + messageClasses.add(RpcMetadata.class); + messageClasses.add(SyncResultsResponse.class); + + return messageClasses; + } + + private static ByteString wrapClassName(Class<?> clz) { + return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8)); + } + + private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer = + new ThreadLocal<UnsynchronizedBuffer>() { + @Override protected UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; + + /** + * Fetches the concrete message's Parser implementation. + * + * @param className The protocol buffer class name + * @return The Parser for the class + * @throws IllegalArgumentException If the argument is null or if a Parser for the given + * class name is not found. + */ + public static RequestTranslator getParserForRequest(String className) { + if (null == className) { + throw new IllegalArgumentException("Cannot fetch parser for null class name"); + } + + RequestTranslator translator = REQUEST_PARSERS.get(className); + if (null == translator) { + throw new IllegalArgumentException("Cannot find parser for " + className); + } + + return translator; + } + + /** + * Fetches the concrete message's Parser implementation. + * + * @param className The protocol buffer class name + * @return The Parser for the class + * @throws IllegalArgumentException If the argument is null or if a Parser for the given + * class name is not found. + */ + public static ResponseTranslator getParserForResponse(String className) { + if (null == className) { + throw new IllegalArgumentException("Cannot fetch parser for null class name"); + } + + ResponseTranslator translator = RESPONSE_PARSERS.get(className); + if (null == translator) { + throw new IllegalArgumentException("Cannot find parser for " + className); + } + + return translator; + } + + @Override public byte[] serializeResponse(Response response) throws IOException { + // Avoid BAOS for its synchronized write methods, we don't need that concurrency control + UnsynchronizedBuffer out = threadLocalBuffer.get(); + try { + Message responseMsg = response.serialize(); + serializeMessage(out, responseMsg); + return out.toArray(); + } finally { + out.reset(); + } + } + + @Override public byte[] serializeRequest(Request request) throws IOException { + // Avoid BAOS for its synchronized write methods, we don't need that concurrency control + UnsynchronizedBuffer out = threadLocalBuffer.get(); + try { + Message requestMsg = request.serialize(); + serializeMessage(out, requestMsg); + return out.toArray(); + } finally { + out.reset(); + } + } + + void serializeMessage(OutputStream out, Message msg) throws IOException { + // Serialize the protobuf message + UnsynchronizedBuffer buffer = threadLocalBuffer.get(); + ByteString serializedMsg; + try { + msg.writeTo(buffer); + // Make a bytestring from it + serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray()); + } finally { + buffer.reset(); + } + + // Wrap the serialized message in a WireMessage + WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass())) + .setWrappedMessage(serializedMsg).build(); + + // Write the WireMessage to the provided OutputStream + wireMsg.writeTo(out); + } + + ByteString getClassNameBytes(Class<?> clz) { + ByteString byteString = MESSAGE_CLASSES.get(clz); + if (null == byteString) { + throw new IllegalArgumentException("Missing ByteString for " + clz.getName()); + } + return byteString; + } + + @Override public Request parseRequest(byte[] bytes) throws IOException { + ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); + CodedInputStream inputStream = byteString.newCodedInput(); + // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the + // WireMessage. + inputStream.enableAliasing(true); + WireMessage wireMsg = WireMessage.parseFrom(inputStream); + + String serializedMessageClassName = wireMsg.getName(); + RequestTranslator translator = getParserForRequest(serializedMessageClassName); + + // The ByteString should be logical offsets into the original byte array + return translator.transform(wireMsg.getWrappedMessage()); + } + + @Override public Response parseResponse(byte[] bytes) throws IOException { + ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); + CodedInputStream inputStream = byteString.newCodedInput(); + // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the + // WireMessage. + inputStream.enableAliasing(true); + WireMessage wireMsg = WireMessage.parseFrom(inputStream); + + String serializedMessageClassName = wireMsg.getName(); + ResponseTranslator translator = getParserForResponse(serializedMessageClassName); + + return translator.transform(wireMsg.getWrappedMessage()); + } +} + +// End ProtobufTranslationImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java new file mode 100644 index 0000000..463985a --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; +import org.apache.calcite.avatica.AvaticaParameter; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.ConnectionPropertiesImpl; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.QueryState; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link org.apache.calcite.avatica.Meta} for the remote + * driver. + */ +class RemoteMeta extends MetaImpl { + final Service service; + final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>(); + private Map<DatabaseProperty, Object> databaseProperties; + + public RemoteMeta(AvaticaConnection connection, Service service) { + super(connection); + this.service = service; + } + + private MetaResultSet toResultSet(Class clazz, + Service.ResultSetResponse response) { + if (response.updateCount != -1) { + return MetaResultSet.count(response.connectionId, response.statementId, + response.updateCount); + } + Signature signature0 = response.signature; + if (signature0 == null) { + final List<ColumnMetaData> columns = + clazz == null + ? Collections.<ColumnMetaData>emptyList() + : fieldMetaData(clazz).columns; + signature0 = Signature.create(columns, + "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY, + Meta.StatementType.SELECT); + } + return MetaResultSet.create(response.connectionId, response.statementId, + response.ownStatement, signature0, response.firstFrame); + } + + @Override public Map<DatabaseProperty, Object> getDatabaseProperties(ConnectionHandle ch) { + synchronized (this) { + // Compute map on first use, and cache + if (databaseProperties == null) { + databaseProperties = + service.apply(new Service.DatabasePropertyRequest(ch.id)).map; + } + return databaseProperties; + } + } + + @Override public StatementHandle createStatement(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<StatementHandle>() { + public StatementHandle call() { + // sync connection state if necessary + connectionSync(ch, new ConnectionPropertiesImpl()); + final Service.CreateStatementResponse response = + service.apply(new Service.CreateStatementRequest(ch.id)); + return new StatementHandle(response.connectionId, response.statementId, null); + } + }); + } + + @Override public void closeStatement(final StatementHandle h) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.CloseStatementResponse response = + service.apply( + new Service.CloseStatementRequest(h.connectionId, h.id)); + return null; + } + }); + } + + @Override public void openConnection(final ConnectionHandle ch, final Map<String, String> info) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.OpenConnectionResponse response = + service.apply(new Service.OpenConnectionRequest(ch.id, info)); + return null; + } + }); + } + + @Override public void closeConnection(final ConnectionHandle ch) { + connection.invokeWithRetries( + new CallableWithoutException<Void>() { + public Void call() { + final Service.CloseConnectionResponse response = + service.apply(new Service.CloseConnectionRequest(ch.id)); + propsMap.remove(ch.id); + return null; + } + }); + } + + @Override public ConnectionProperties connectionSync(final ConnectionHandle ch, + final ConnectionProperties connProps) { + return connection.invokeWithRetries( + new CallableWithoutException<ConnectionProperties>() { + public ConnectionProperties call() { + ConnectionPropertiesImpl localProps = propsMap.get(ch.id); + if (localProps == null) { + localProps = new ConnectionPropertiesImpl(); + localProps.setDirty(true); + propsMap.put(ch.id, localProps); + } + + // Only make an RPC if necessary. RPC is necessary when we have local changes that need + // flushed to the server (be sure to introduce any new changes from connProps before + // checking AND when connProps.isEmpty() (meaning, this was a request for a value, not + // overriding a value). Otherwise, accumulate the change locally and return immediately. + if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) { + final Service.ConnectionSyncResponse response = service.apply( + new Service.ConnectionSyncRequest(ch.id, localProps)); + propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps); + return response.connProps; + } else { + return localProps; + } + } + }); + } + + @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.CatalogsRequest(ch.id)); + return toResultSet(MetaCatalog.class, response); + } + }); + } + + @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.SchemasRequest(ch.id, catalog, schemaPattern.s)); + return toResultSet(MetaSchema.class, response); + } + }); + } + + @Override public MetaResultSet getTables(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern, final Pat tableNamePattern, final List<String> typeList) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.TablesRequest(ch.id, catalog, schemaPattern.s, + tableNamePattern.s, typeList)); + return toResultSet(MetaTable.class, response); + } + }); + } + + @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.TableTypesRequest(ch.id)); + return toResultSet(MetaTableType.class, response); + } + }); + } + + @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply(new Service.TypeInfoRequest(ch.id)); + return toResultSet(MetaTypeInfo.class, response); + } + }); + } + + @Override public MetaResultSet getColumns(final ConnectionHandle ch, final String catalog, + final Pat schemaPattern, final Pat tableNamePattern, final Pat columnNamePattern) { + return connection.invokeWithRetries( + new CallableWithoutException<MetaResultSet>() { + public MetaResultSet call() { + final Service.ResultSetResponse response = + service.apply( + new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s, + tableNamePattern.s, columnNamePattern.s)); + return toResultSet(MetaColumn.class, response); + } + }); + } + + @Override public StatementHandle prepare(final ConnectionHandle ch, final String sql, + final long maxRowCount) { + return connection.invokeWithRetries( + new CallableWithoutException<StatementHandle>() { + public StatementHandle call() { + connectionSync(ch, + new ConnectionPropertiesImpl()); // sync connection state if necessary + final Service.PrepareResponse response = service.apply( + new Service.PrepareRequest(ch.id, sql, maxRowCount)); + return response.statement; + } + }); + } + + @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql, + final long maxRowCount, final PrepareCallback callback) throws NoSuchStatementException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<ExecuteResult>() { + public ExecuteResult call() { + // sync connection state if necessary + connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl()); + final Service.ExecuteResponse response; + try { + synchronized (callback.getMonitor()) { + callback.clear(); + response = service.apply( + new Service.PrepareAndExecuteRequest(h.connectionId, + h.id, sql, maxRowCount)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + if (response.results.size() > 0) { + final Service.ResultSetResponse result = response.results.get(0); + callback.assign(result.signature, result.firstFrame, + result.updateCount); + } + } + callback.execute(); + List<MetaResultSet> metaResultSets = new ArrayList<>(); + for (Service.ResultSetResponse result : response.results) { + metaResultSets.add(toResultSet(null, result)); + } + return new ExecuteResult(metaResultSets); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } + throw e; + } + } + + @Override public Frame fetch(final StatementHandle h, final long offset, + final int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<Frame>() { + public Frame call() { + final Service.FetchResponse response = + service.apply( + new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + if (response.missingResults) { + throw new RuntimeException(new MissingResultsException(h)); + } + return response.frame; + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } else if (cause instanceof MissingResultsException) { + throw (MissingResultsException) cause; + } + throw e; + } + } + + @Override public ExecuteResult execute(final StatementHandle h, + final List<TypedValue> parameterValues, final long maxRowCount) + throws NoSuchStatementException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<ExecuteResult>() { + public ExecuteResult call() { + final Service.ExecuteResponse response = service.apply( + new Service.ExecuteRequest(h, parameterValues, maxRowCount)); + + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + + List<MetaResultSet> metaResultSets = new ArrayList<>(); + for (Service.ResultSetResponse result : response.results) { + metaResultSets.add(toResultSet(null, result)); + } + + return new ExecuteResult(metaResultSets); + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } + throw e; + } + } + + @Override public boolean syncResults(final StatementHandle h, final QueryState state, + final long offset) throws NoSuchStatementException { + try { + return connection.invokeWithRetries( + new CallableWithoutException<Boolean>() { + public Boolean call() { + final Service.SyncResultsResponse response = + service.apply( + new Service.SyncResultsRequest(h.connectionId, h.id, state, offset)); + if (response.missingStatement) { + throw new RuntimeException(new NoSuchStatementException(h)); + } + return response.moreResults; + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof NoSuchStatementException) { + throw (NoSuchStatementException) cause; + } + throw e; + } + } + + @Override public void commit(final ConnectionHandle ch) { + connection.invokeWithRetries(new CallableWithoutException<Void>() { + public Void call() { + final Service.CommitResponse response = + service.apply(new Service.CommitRequest(ch.id)); + return null; + } + }); + } + + @Override public void rollback(final ConnectionHandle ch) { + connection.invokeWithRetries(new CallableWithoutException<Void>() { + public Void call() { + final Service.RollbackResponse response = + service.apply(new Service.RollbackRequest(ch.id)); + return null; + } + }); + } +} + +// End RemoteMeta.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java new file mode 100644 index 0000000..828513a --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import java.io.IOException; + +/** + * ProtobufService implementation that queries against a remote implementation, using + * protocol buffers as the serialized form. + */ +public class RemoteProtobufService extends ProtobufService { + private final AvaticaHttpClient client; + private final ProtobufTranslation translation; + + public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation translation) { + this.client = client; + this.translation = translation; + } + + @Override public Response _apply(Request request) { + final Response resp; + try { + byte[] response = client.send(translation.serializeRequest(request)); + resp = translation.parseResponse(response); + } catch (IOException e) { + // Not a protobuf that we could parse. + throw new RuntimeException(e); + } + + // The server had an error, throw an Exception for that. + if (resp instanceof ErrorResponse) { + throw ((ErrorResponse) resp).toException(); + } + + return resp; + } +} + +// End RemoteProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java new file mode 100644 index 0000000..d4828b5 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import java.nio.charset.StandardCharsets; + +/** + * Implementation of {@link org.apache.calcite.avatica.remote.Service} + * that translates requests into JSON and sends them to a remote server, + * usually an HTTP server. + */ +public class RemoteService extends JsonService { + private final AvaticaHttpClient client; + + public RemoteService(AvaticaHttpClient client) { + this.client = client; + } + + @Override public String apply(String request) { + byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8)); + return new String(response, StandardCharsets.UTF_8); + } +} + +// End RemoteService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java new file mode 100644 index 0000000..417c6ed --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; + +/** + * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request. + */ +public class RequestTranslator { + + private final Parser<? extends Message> parser; + private final Service.Request impl; + + public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) { + this.parser = parser; + this.impl = impl; + } + + public Service.Request transform(ByteString serializedMessage) throws + InvalidProtocolBufferException { + // This should already be an aliased CodedInputStream from the WireMessage parsing. + Message msg = parser.parseFrom(serializedMessage.newCodedInput()); + return impl.deserialize(msg); + } +} + +// End RequestTranslator.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java new file mode 100644 index 0000000..0311e13 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; + +/** + * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response. + */ +public class ResponseTranslator { + + private final Parser<? extends Message> parser; + private final Service.Response impl; + + public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) { + this.parser = parser; + this.impl = impl; + } + + public Service.Response transform(ByteString serializedMessage) throws + InvalidProtocolBufferException { + Message msg = parser.parseFrom(serializedMessage); + return impl.deserialize(msg); + } +} + +// End ResponseTranslator.java
