http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java new file mode 100644 index 0000000..1968fcd --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaSeverity; +import org.apache.calcite.avatica.NoSuchConnectionException; +import org.apache.calcite.avatica.remote.Service.ErrorResponse; +import org.apache.calcite.avatica.remote.Service.Request; +import org.apache.calcite.avatica.remote.Service.Response; +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; + +import java.io.IOException; + +/** + * Abstract base class for {@link Handler}s to extend to inherit functionality common across + * serialization strategies. + * + * @param <T> The format Requests/Responses are serialized as. + */ +public abstract class AbstractHandler<T> implements Handler<T> { + private static final String NULL_EXCEPTION_MESSAGE = "(null exception message)"; + protected final Service service; + private RpcMetadataResponse metadata = null; + + public AbstractHandler(Service service) { + this.service = service; + } + + abstract Request decode(T serializedRequest) throws IOException; + + /** + * Serialize the given {@link Response} per the concrete {@link Handler} implementation. + * + * @param response The {@link Response} to serialize. + * @return A serialized representation of the {@link Response}. + * @throws IOException + */ + abstract T encode(Response response) throws IOException; + + /** + * Unwrap Avatica-specific context about a given exception. + * + * @param e A caught exception throw by Avatica implementation. + * @return An {@link ErrorResponse}. + */ + ErrorResponse unwrapException(Exception e) { + // By default, we know nothing extra. + int errorCode = ErrorResponse.UNKNOWN_ERROR_CODE; + String sqlState = ErrorResponse.UNKNOWN_SQL_STATE; + AvaticaSeverity severity = AvaticaSeverity.UNKNOWN; + String errorMsg = null; + + // Extract the contextual information if we have it. We may not. + if (e instanceof AvaticaRuntimeException) { + AvaticaRuntimeException rte = (AvaticaRuntimeException) e; + errorCode = rte.getErrorCode(); + sqlState = rte.getSqlState(); + severity = rte.getSeverity(); + errorMsg = rte.getErrorMessage(); + } else if (e instanceof NoSuchConnectionException) { + errorCode = ErrorResponse.MISSING_CONNECTION_ERROR_CODE; + severity = AvaticaSeverity.ERROR; + errorMsg = e.getMessage(); + } else { + // Try to construct a meaningful error message when the server impl doesn't provide one. + errorMsg = getCausalChain(e); + } + + return new ErrorResponse(e, errorMsg, errorCode, sqlState, severity, metadata); + } + + /** + * Compute a response for the given request, handling errors generated by that computation. + * + * @param serializedRequest The caller's request. + * @return A {@link Response} with additional context about that response. + */ + public HandlerResponse<T> apply(T serializedRequest) { + final Service.Request request; + try { + request = decode(serializedRequest); + } catch (IOException e) { + // TODO provide a canned ErrorResponse. + throw new RuntimeException(e); + } + + try { + final Service.Response response = request.accept(service); + return new HandlerResponse<>(encode(response), HTTP_OK); + } catch (Exception e) { + ErrorResponse errorResp = unwrapException(e); + + try { + return new HandlerResponse<>(encode(errorResp), HTTP_INTERNAL_SERVER_ERROR); + } catch (IOException e1) { + // TODO provide a canned ErrorResponse + + // If we can't serialize error message to JSON, can't give a meaningful error to caller. + // Just try to not unnecessarily create more exceptions. + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + + throw new RuntimeException(e); + } + } + } + + /** + * Constructs a message for the summary of an Exception. + * + * @param e The Exception to summarize. + * @return A summary message for the Exception. + */ + private String getCausalChain(Exception e) { + StringBuilder sb = new StringBuilder(16); + Throwable curr = e; + // Could use Guava, but that would increase dependency set unnecessarily. + while (null != curr) { + if (sb.length() > 0) { + sb.append(" -> "); + } + String message = curr.getMessage(); + sb.append(curr.getClass().getSimpleName()).append(": "); + sb.append(null == message ? NULL_EXCEPTION_MESSAGE : message); + curr = curr.getCause(); + } + if (sb.length() == 0) { + // Catch the case where we have no error message. + return "Unknown error message"; + } + return sb.toString(); + } + + @Override public void setRpcMetadata(RpcMetadataResponse metadata) { + this.metadata = metadata; + } +} + +// End AbstractHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java new file mode 100644 index 0000000..ffaa360 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.Meta; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +/** + * A common base class for {@link Service} implementations that implement + * modifications made to response objects. + */ +public abstract class AbstractService implements Service { + + private RpcMetadataResponse rpcMetadata = null; + + /** + * Represents the serialization of the data over a transport. + */ + enum SerializationType { + JSON, + PROTOBUF + } + + /** + * @return The manner in which the data is serialized. + */ + abstract SerializationType getSerializationType(); + + /** Modifies a signature, changing the representation of numeric columns + * within it. This deals with the fact that JSON transmits a small long value, + * or a float which is a whole number, as an integer. Thus the accessors need + * be prepared to accept any numeric type. */ + Meta.Signature finagle(Meta.Signature signature) { + final List<ColumnMetaData> columns = new ArrayList<>(); + for (ColumnMetaData column : signature.columns) { + columns.add(finagle(column)); + } + if (columns.equals(signature.columns)) { + return signature; + } + return new Meta.Signature(columns, signature.sql, + signature.parameters, signature.internalParameters, + signature.cursorFactory, signature.statementType); + } + + ColumnMetaData finagle(ColumnMetaData column) { + switch (column.type.rep) { + case BYTE: + case PRIMITIVE_BYTE: + case DOUBLE: + case PRIMITIVE_DOUBLE: + case FLOAT: + case PRIMITIVE_FLOAT: + case INTEGER: + case PRIMITIVE_INT: + case SHORT: + case PRIMITIVE_SHORT: + case LONG: + case PRIMITIVE_LONG: + return column.setRep(ColumnMetaData.Rep.NUMBER); + default: + // continue + break; + } + switch (column.type.id) { + case Types.VARBINARY: + case Types.BINARY: + switch (getSerializationType()) { + case JSON: + return column.setRep(ColumnMetaData.Rep.STRING); + case PROTOBUF: + return column; + default: + throw new IllegalStateException("Unhadled case statement"); + } + case Types.DECIMAL: + case Types.NUMERIC: + return column.setRep(ColumnMetaData.Rep.NUMBER); + default: + return column; + } + } + + PrepareResponse finagle(PrepareResponse response) { + final Meta.StatementHandle statement = finagle(response.statement); + if (statement == response.statement) { + return response; + } + return new PrepareResponse(statement, rpcMetadata); + } + + Meta.StatementHandle finagle(Meta.StatementHandle h) { + final Meta.Signature signature = finagle(h.signature); + if (signature == h.signature) { + return h; + } + return new Meta.StatementHandle(h.connectionId, h.id, signature); + } + + ResultSetResponse finagle(ResultSetResponse r) { + if (r.updateCount != -1) { + assert r.signature == null; + return r; + } + if (r.signature == null) { + return r; + } + final Meta.Signature signature = finagle(r.signature); + if (signature == r.signature) { + return r; + } + return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement, + signature, r.firstFrame, r.updateCount, rpcMetadata); + } + + ExecuteResponse finagle(ExecuteResponse r) { + if (r.missingStatement) { + return r; + } + final List<ResultSetResponse> results = new ArrayList<>(); + int changeCount = 0; + for (ResultSetResponse result : r.results) { + ResultSetResponse result2 = finagle(result); + if (result2 != result) { + ++changeCount; + } + results.add(result2); + } + if (changeCount == 0) { + return r; + } + return new ExecuteResponse(results, r.missingStatement, rpcMetadata); + } + + @Override public void setRpcMetadata(RpcMetadataResponse metadata) { + // OK if this is null + this.rpcMetadata = metadata; + } +} + +// End AbstractService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java new file mode 100644 index 0000000..3bda248 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.http.ConnectionReuseStrategy; +import org.apache.http.HttpClientConnection; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.client.protocol.RequestExpectContinue; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.pool.BasicConnFactory; +import org.apache.http.impl.pool.BasicConnPool; +import org.apache.http.impl.pool.BasicPoolEntry; +import org.apache.http.message.BasicHttpEntityEnclosingRequest; +import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.protocol.HttpProcessor; +import org.apache.http.protocol.HttpProcessorBuilder; +import org.apache.http.protocol.HttpRequestExecutor; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.http.util.EntityUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.concurrent.Future; + +/** + * A common class to invoke HTTP requests against the Avatica server agnostic of the data being + * sent and received across the wire. + */ +public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient { + private static final Logger LOG = LoggerFactory.getLogger(AvaticaCommonsHttpClientImpl.class); + private static final ConnectionReuseStrategy REUSE = DefaultConnectionReuseStrategy.INSTANCE; + + // Some basic exposed configurations + private static final String MAX_POOLED_CONNECTION_PER_ROUTE_KEY = + "avatica.pooled.connections.per.route"; + private static final String MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT = "4"; + private static final String MAX_POOLED_CONNECTIONS_KEY = "avatica.pooled.connections.max"; + private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "16"; + + protected final HttpHost host; + protected final HttpProcessor httpProcessor; + protected final HttpRequestExecutor httpExecutor; + protected final BasicConnPool httpPool; + + public AvaticaCommonsHttpClientImpl(URL url) { + this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol()); + + this.httpProcessor = HttpProcessorBuilder.create() + .add(new RequestContent()) + .add(new RequestTargetHost()) + .add(new RequestConnControl()) + .add(new RequestExpectContinue()).build(); + + this.httpExecutor = new HttpRequestExecutor(); + + this.httpPool = new BasicConnPool(new BasicConnFactory()); + int maxPerRoute = Integer.parseInt( + System.getProperty(MAX_POOLED_CONNECTION_PER_ROUTE_KEY, + MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT)); + int maxTotal = Integer.parseInt( + System.getProperty(MAX_POOLED_CONNECTIONS_KEY, + MAX_POOLED_CONNECTIONS_DEFAULT)); + httpPool.setDefaultMaxPerRoute(maxPerRoute); + httpPool.setMaxTotal(maxTotal); + } + + public byte[] send(byte[] request) { + while (true) { + boolean reusable = false; + // Get a connection from the pool + Future<BasicPoolEntry> future = this.httpPool.lease(host, null); + BasicPoolEntry entry = null; + try { + entry = future.get(); + HttpCoreContext coreContext = HttpCoreContext.create(); + coreContext.setTargetHost(host); + + HttpClientConnection conn = entry.getConnection(); + + ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM); + + BasicHttpEntityEnclosingRequest postRequest = + new BasicHttpEntityEnclosingRequest("POST", "/"); + postRequest.setEntity(entity); + + httpExecutor.preProcess(postRequest, httpProcessor, coreContext); + HttpResponse response = httpExecutor.execute(postRequest, conn, coreContext); + httpExecutor.postProcess(response, httpProcessor, coreContext); + + // Should the connection be kept alive? + reusable = REUSE.keepAlive(response, coreContext); + + final int statusCode = response.getStatusLine().getStatusCode(); + if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) { + // Could be sitting behind a load-balancer, try again. + continue; + } + + return EntityUtils.toByteArray(response.getEntity()); + } catch (Exception e) { + LOG.debug("Failed to execute HTTP request", e); + throw new RuntimeException(e); + } finally { + // Release the connection back to the pool, marking if it's good to reuse or not. + httpPool.release(entry, reusable); + } + } + } +} + +// End AvaticaCommonsHttpClientImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java new file mode 100644 index 0000000..eac1b74 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +/** + * An interface which defines how requests are sent to the Avatica server. + */ +public interface AvaticaHttpClient { + + /** + * Sends a serialized request to the Avatica server. + * + * @param request The serialized request. + * @return The serialized response. + */ + byte[] send(byte[] request); + +} + +// End AvaticaHttpClient.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java new file mode 100644 index 0000000..b5d213a --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfig; + +import java.net.URL; + +/** + * A factory for constructing {@link AvaticaHttpClient}'s. + */ +public interface AvaticaHttpClientFactory { + + /** + * Construct the appropriate implementation of {@link AvaticaHttpClient}. + * + * @param url URL that the client is for. + * @param config Configuration to use when constructing the implementation. + * @return An instance of {@link AvaticaHttpClient}. + */ + AvaticaHttpClient getClient(URL url, ConnectionConfig config); + +} + +// End AvaticaHttpClientFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java new file mode 100644 index 0000000..cd6cbce --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfig; + +import java.lang.reflect.Constructor; +import java.net.URL; +import java.util.Objects; + +/** + * Default implementation of {@link AvaticaHttpClientFactory} which chooses an implementation + * from a property. + */ +public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory { + public static final String HTTP_CLIENT_IMPL_DEFAULT = + AvaticaCommonsHttpClientImpl.class.getName(); + + // Public for Type.PLUGIN + public static final AvaticaHttpClientFactoryImpl INSTANCE = new AvaticaHttpClientFactoryImpl(); + + // Public for Type.PLUGIN + public AvaticaHttpClientFactoryImpl() {} + + /** + * Returns a singleton instance of {@link AvaticaHttpClientFactoryImpl}. + * + * @return A singleton instance. + */ + public static AvaticaHttpClientFactoryImpl getInstance() { + return INSTANCE; + } + + @Override public AvaticaHttpClient getClient(URL url, ConnectionConfig config) { + String className = config.httpClientClass(); + if (null == className) { + className = HTTP_CLIENT_IMPL_DEFAULT; + } + + try { + Class<?> clz = Class.forName(className); + Constructor<?> constructor = clz.getConstructor(URL.class); + Object instance = constructor.newInstance(Objects.requireNonNull(url)); + return AvaticaHttpClient.class.cast(instance); + } catch (Exception e) { + throw new RuntimeException("Failed to construct AvaticaHttpClient implementation " + + className, e); + } + } +} + +// End AvaticaHttpClientFactoryImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java new file mode 100644 index 0000000..c100eec --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * A common class to invoke HTTP requests against the Avatica server agnostic of the data being + * sent and received across the wire. + */ +public class AvaticaHttpClientImpl implements AvaticaHttpClient { + protected final URL url; + + public AvaticaHttpClientImpl(URL url) { + this.url = url; + } + + public byte[] send(byte[] request) { + // TODO back-off policy? + while (true) { + try { + final HttpURLConnection connection = openConnection(); + connection.setRequestMethod("POST"); + connection.setDoInput(true); + connection.setDoOutput(true); + try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) { + wr.write(request); + wr.flush(); + wr.close(); + } + final int responseCode = connection.getResponseCode(); + final InputStream inputStream; + if (responseCode == HttpURLConnection.HTTP_UNAVAILABLE) { + // Could be sitting behind a load-balancer, try again. + continue; + } else if (responseCode != HttpURLConnection.HTTP_OK) { + inputStream = connection.getErrorStream(); + } else { + inputStream = connection.getInputStream(); + } + return AvaticaUtils.readFullyToBytes(inputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + HttpURLConnection openConnection() throws IOException { + return (HttpURLConnection) url.openConnection(); + } +} + +// End AvaticaHttpClientImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java new file mode 100644 index 0000000..d5ae9b1 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfigImpl; + +import java.util.Properties; + +/** Implementation of {@link org.apache.calcite.avatica.ConnectionConfig} + * with extra properties specific to Remote Driver. */ +public class AvaticaRemoteConnectionConfigImpl extends ConnectionConfigImpl { + public AvaticaRemoteConnectionConfigImpl(Properties properties) { + super(properties); + } + + public Service.Factory factory() { + return AvaticaRemoteConnectionProperty.FACTORY.wrap(properties) + .getPlugin(Service.Factory.class, null); + } +} + +// End AvaticaRemoteConnectionConfigImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java new file mode 100644 index 0000000..5e755f8 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionProperty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.calcite.avatica.ConnectionConfigImpl.PropEnv; +import static org.apache.calcite.avatica.ConnectionConfigImpl.parse; + +/** + * Enumeration of Avatica remote driver's built-in connection properties. + */ +public enum AvaticaRemoteConnectionProperty implements ConnectionProperty { + /** Factory. */ + FACTORY("factory", Type.STRING, null); + + private final String camelName; + private final Type type; + private final Object defaultValue; + + private static final Map<String, AvaticaRemoteConnectionProperty> NAME_TO_PROPS; + + static { + NAME_TO_PROPS = new HashMap<String, AvaticaRemoteConnectionProperty>(); + for (AvaticaRemoteConnectionProperty p + : AvaticaRemoteConnectionProperty.values()) { + NAME_TO_PROPS.put(p.camelName.toUpperCase(), p); + NAME_TO_PROPS.put(p.name(), p); + } + } + + AvaticaRemoteConnectionProperty(String camelName, + Type type, + Object defaultValue) { + this.camelName = camelName; + this.type = type; + this.defaultValue = defaultValue; + assert defaultValue == null || type.valid(defaultValue); + } + + public String camelName() { + return camelName; + } + + public Object defaultValue() { + return defaultValue; + } + + public Type type() { + return type; + } + + public PropEnv wrap(Properties properties) { + return new PropEnv(parse(properties, NAME_TO_PROPS), this); + } + + public boolean required() { + return false; + } +} + +// End AvaticaRemoteConnectionProperty.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java new file mode 100644 index 0000000..2f9a1cd --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaSeverity; +import org.apache.calcite.avatica.remote.Service.ErrorResponse; + +import java.util.Objects; + +/** + * A {@link RuntimeException} thrown by Avatica with additional contextual information about + * what happened to cause the Exception. + */ +public class AvaticaRuntimeException extends RuntimeException { + private static final long serialVersionUID = 1L; + private final String errorMessage; + private final int errorCode; + private final String sqlState; + private final AvaticaSeverity severity; + + /** + * Constructs an {@code AvaticaRuntimeException} with no additional information. + * + * <p>It is strongly preferred that the caller invoke + * {@link #AvaticaRuntimeException(String, int, String, AvaticaSeverity)} + * with proper contextual information. + */ + public AvaticaRuntimeException() { + this("No additional context on exception", ErrorResponse.UNKNOWN_ERROR_CODE, + ErrorResponse.UNKNOWN_SQL_STATE, AvaticaSeverity.UNKNOWN); + } + + /** + * Constructs an {@code AvaticaRuntimeException} with the given + * contextual information surrounding the error. + * + * @param errorMessage A human-readable explanation about what happened + * @param errorCode Numeric identifier for error + * @param sqlState 5-character identifier for error + * @param severity Severity + */ + public AvaticaRuntimeException(String errorMessage, int errorCode, String sqlState, + AvaticaSeverity severity) { + this.errorMessage = Objects.requireNonNull(errorMessage); + this.errorCode = errorCode; + this.sqlState = Objects.requireNonNull(sqlState); + this.severity = Objects.requireNonNull(severity); + } + + /** + * Returns a human-readable error message. + */ + public String getErrorMessage() { + return errorMessage; + } + + /** + * Returns a numeric code for this error. + */ + public int getErrorCode() { + return errorCode; + } + + /** + * Returns the five-character identifier for this error. + */ + public String getSqlState() { + return sqlState; + } + + /** + * Returns the severity at which this exception is thrown. + */ + public AvaticaSeverity getSeverity() { + return severity; + } + + @Override public String toString() { + StringBuilder sb = new StringBuilder(64); + return sb.append("AvaticaRuntimeException: [") + .append("Messsage: '").append(errorMessage).append("', ") + .append("Error code: '").append(errorCode).append("', ") + .append("SQL State: '").append(sqlState).append("', ") + .append("Severity: '").append(severity).append("']").toString(); + } +} + +// End AvaticaRuntimeException.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java new file mode 100644 index 0000000..e98c486 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.BuiltInConnectionProperty; +import org.apache.calcite.avatica.ConnectionConfig; +import org.apache.calcite.avatica.ConnectionProperty; +import org.apache.calcite.avatica.DriverVersion; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.UnregisteredDriver; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +/** + * Avatica Remote JDBC driver. + */ +public class Driver extends UnregisteredDriver { + private static final Logger LOG = LoggerFactory.getLogger(Driver.class); + public static final String CONNECT_STRING_PREFIX = "jdbc:avatica:remote:"; + + static { + new Driver().register(); + } + + public Driver() { + super(); + } + + /** + * Defines the method of message serialization used by the Driver + */ + public static enum Serialization { + JSON, + PROTOBUF; + } + + @Override protected String getConnectStringPrefix() { + return CONNECT_STRING_PREFIX; + } + + protected DriverVersion createDriverVersion() { + return DriverVersion.load( + Driver.class, + "org-apache-calcite-jdbc.properties", + "Avatica Remote JDBC Driver", + "unknown version", + "Avatica", + "unknown version"); + } + + @Override protected Collection<ConnectionProperty> getConnectionProperties() { + final List<ConnectionProperty> list = new ArrayList<ConnectionProperty>(); + Collections.addAll(list, BuiltInConnectionProperty.values()); + Collections.addAll(list, AvaticaRemoteConnectionProperty.values()); + return list; + } + + @Override public Meta createMeta(AvaticaConnection connection) { + final ConnectionConfig config = connection.config(); + final Service service = createService(connection, config); + return new RemoteMeta(connection, service); + } + + /** + * Creates a {@link Service} with the given {@link AvaticaConnection} and configuration. + * + * @param connection The {@link AvaticaConnection} to use. + * @param config Configuration properties + * @return A Service implementation. + */ + Service createService(AvaticaConnection connection, ConnectionConfig config) { + final Service.Factory metaFactory = config.factory(); + final Service service; + if (metaFactory != null) { + service = metaFactory.create(connection); + } else if (config.url() != null) { + final AvaticaHttpClient httpClient = getHttpClient(connection, config); + final Serialization serializationType = getSerialization(config); + + LOG.debug("Instantiating {} service", serializationType); + switch (serializationType) { + case JSON: + service = new RemoteService(httpClient); + break; + case PROTOBUF: + service = new RemoteProtobufService(httpClient, new ProtobufTranslationImpl()); + break; + default: + throw new IllegalArgumentException("Unhandled serialization type: " + serializationType); + } + } else { + service = new MockJsonService(Collections.<String, String>emptyMap()); + } + return service; + } + + /** + * Creates the HTTP client that communicates with the Avatica server. + * + * @param connection The {@link AvaticaConnection}. + * @param config The configuration. + * @return An {@link AvaticaHttpClient} implementation. + */ + AvaticaHttpClient getHttpClient(AvaticaConnection connection, ConnectionConfig config) { + URL url; + try { + url = new URL(config.url()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + + AvaticaHttpClientFactory httpClientFactory = config.httpClientFactory(); + + return httpClientFactory.getClient(url, config); + } + + @Override public Connection connect(String url, Properties info) + throws SQLException { + AvaticaConnection conn = (AvaticaConnection) super.connect(url, info); + if (conn == null) { + // It's not an url for our driver + return null; + } + + // Create the corresponding remote connection + ConnectionConfig config = conn.config(); + Service service = createService(conn, config); + + service.apply( + new Service.OpenConnectionRequest(conn.id, + Service.OpenConnectionRequest.serializeProperties(info))); + + return conn; + } + + Serialization getSerialization(ConnectionConfig config) { + final String serializationStr = config.serialization(); + Serialization serializationType = Serialization.JSON; + if (null != serializationStr) { + try { + serializationType = Serialization.valueOf(serializationStr.toUpperCase()); + } catch (Exception e) { + // Log a warning instead of failing harshly? Intentionally no loggers available? + throw new RuntimeException(e); + } + } + + return serializationType; + } +} + +// End Driver.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java new file mode 100644 index 0000000..30d026c --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; + +import java.util.Objects; + +/** + * API for text request-response calls to an Avatica server. + * + * @param <T> The type this handler accepts and returns + */ +public interface Handler<T> { + int HTTP_OK = 200; + int HTTP_INTERNAL_SERVER_ERROR = 500; + String HANDLER_SERIALIZATION_METRICS_NAME = "Handler.Serialization"; + + /** + * Struct that encapsulates the context of the result of a request to Avatica. + */ + public class HandlerResponse<T> { + private final T response; + private final int statusCode; + + public HandlerResponse(T response, int statusCode) { + this.response = Objects.requireNonNull(response); + this.statusCode = statusCode; + } + + public T getResponse() { + return response; + } + + public int getStatusCode() { + return statusCode; + } + + @Override public String toString() { + return "Response: " + response + ", Status:" + statusCode; + } + } + + HandlerResponse<T> apply(T request); + + /** + * Sets some general server information to return to the client in all responses. + * + * @param metadata Server-wide information + */ + void setRpcMetadata(RpcMetadataResponse metadata); +} + +// End Handler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java new file mode 100644 index 0000000..fd57078 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.remote.Service.Request; +import org.apache.calcite.avatica.remote.Service.Response; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.StringWriter; + +/** + * Implementation of {@link org.apache.calcite.avatica.remote.Handler} + * that decodes JSON requests, sends them to a {@link Service}, + * and encodes the responses into JSON. + * + * @see org.apache.calcite.avatica.remote.JsonService + */ +public class JsonHandler extends AbstractHandler<String> { + + protected static final ObjectMapper MAPPER = JsonService.MAPPER; + + final MetricsSystem metrics; + final Timer serializationTimer; + + public JsonHandler(Service service, MetricsSystem metrics) { + super(service); + this.metrics = metrics; + this.serializationTimer = this.metrics.getTimer( + MetricsHelper.concat(JsonHandler.class, HANDLER_SERIALIZATION_METRICS_NAME)); + } + + public HandlerResponse<String> apply(String jsonRequest) { + return super.apply(jsonRequest); + } + + @Override Request decode(String request) throws IOException { + try (final Context ctx = serializationTimer.start()) { + return MAPPER.readValue(request, Service.Request.class); + } + } + + /** + * Serializes the provided object as JSON. + * + * @param response The object to serialize. + * @return A JSON string. + */ + @Override String encode(Response response) throws IOException { + try (final Context ctx = serializationTimer.start()) { + final StringWriter w = new StringWriter(); + MAPPER.writeValue(w, response); + return w.toString(); + } + } +} + +// End JsonHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java new file mode 100644 index 0000000..668b3be --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.StringWriter; + +/** + * Implementation of {@link org.apache.calcite.avatica.remote.Service} + * that encodes requests and responses as JSON. + */ +public abstract class JsonService extends AbstractService { + public static final ObjectMapper MAPPER; + static { + MAPPER = new ObjectMapper(); + MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); + MAPPER.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); + } + + public JsonService() { + } + + /** Derived class should implement this method to transport requests and + * responses to and from the peer service. */ + public abstract String apply(String request); + + @Override SerializationType getSerializationType() { + return SerializationType.JSON; + } + + //@VisibleForTesting + protected static <T> T decode(String response, Class<T> expectedType) + throws IOException { + Response resp = MAPPER.readValue(response, Response.class); + if (resp instanceof ErrorResponse) { + throw ((ErrorResponse) resp).toException(); + } else if (!expectedType.isAssignableFrom(resp.getClass())) { + throw new ClassCastException("Cannot cast " + resp.getClass() + " into " + expectedType); + } + + return expectedType.cast(resp); + } + + //@VisibleForTesting + protected static <T> String encode(T request) throws IOException { + final StringWriter w = new StringWriter(); + MAPPER.writeValue(w, request); + return w.toString(); + } + + protected RuntimeException handle(IOException e) { + return new RuntimeException(e); + } + + public ResultSetResponse apply(CatalogsRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ResultSetResponse apply(SchemasRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ResultSetResponse apply(TablesRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ResultSetResponse apply(TableTypesRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ResultSetResponse apply(TypeInfoRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ResultSetResponse apply(ColumnsRequest request) { + try { + return finagle(decode(apply(encode(request)), ResultSetResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public PrepareResponse apply(PrepareRequest request) { + try { + return finagle(decode(apply(encode(request)), PrepareResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public ExecuteResponse apply(PrepareAndExecuteRequest request) { + try { + return finagle(decode(apply(encode(request)), ExecuteResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public FetchResponse apply(FetchRequest request) { + try { + return decode(apply(encode(request)), FetchResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public ExecuteResponse apply(ExecuteRequest request) { + try { + return finagle(decode(apply(encode(request)), ExecuteResponse.class)); + } catch (IOException e) { + throw handle(e); + } + } + + public CreateStatementResponse apply(CreateStatementRequest request) { + try { + return decode(apply(encode(request)), CreateStatementResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public CloseStatementResponse apply(CloseStatementRequest request) { + try { + return decode(apply(encode(request)), CloseStatementResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public OpenConnectionResponse apply(OpenConnectionRequest request) { + try { + return decode(apply(encode(request)), OpenConnectionResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public CloseConnectionResponse apply(CloseConnectionRequest request) { + try { + return decode(apply(encode(request)), CloseConnectionResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public ConnectionSyncResponse apply(ConnectionSyncRequest request) { + try { + return decode(apply(encode(request)), ConnectionSyncResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public DatabasePropertyResponse apply(DatabasePropertyRequest request) { + try { + return decode(apply(encode(request)), DatabasePropertyResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public SyncResultsResponse apply(SyncResultsRequest request) { + try { + return decode(apply(encode(request)), SyncResultsResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public CommitResponse apply(CommitRequest request) { + try { + return decode(apply(encode(request)), CommitResponse.class); + } catch (IOException e) { + throw handle(e); + } + } + + public RollbackResponse apply(RollbackRequest request) { + try { + return decode(apply(encode(request)), RollbackResponse.class); + } catch (IOException e) { + throw handle(e); + } + } +} + +// End JsonService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java new file mode 100644 index 0000000..0af6300 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import java.io.IOException; +import java.io.StringWriter; + +/** + * Implementation of {@link org.apache.calcite.avatica.remote.Service} + * that goes to an in-process instance of {@code Service}. + */ +public class LocalJsonService extends JsonService { + private final Service service; + + public LocalJsonService(Service service) { + this.service = service; + } + + @Override public String apply(String request) { + try { + Request request2 = MAPPER.readValue(request, Request.class); + Response response2 = request2.accept(service); + final StringWriter w = new StringWriter(); + MAPPER.writeValue(w, response2); + return w.toString(); + } catch (IOException e) { + throw handle(e); + } + } +} + +// End LocalJsonService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java new file mode 100644 index 0000000..76e2392 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import java.io.IOException; + +/** + * A Service implementation that performs protocol buffer serialization on request and responses + * on either side of computing a response from a request to mimic some transport to a server which + * would normally perform such computation. + */ +public class LocalProtobufService extends ProtobufService { + private final Service service; + private final ProtobufTranslation translation; + + public LocalProtobufService(Service service, ProtobufTranslation translation) { + this.service = service; + this.translation = translation; + } + + @Override public Response _apply(Request request) { + try { + // Serialize the request to "send to the server" + byte[] serializedRequest = translation.serializeRequest(request); + + // *some transport would normally happen here* + + // Fake deserializing that request somewhere else + Request request2 = translation.parseRequest(serializedRequest); + + // Serialize the response from the service to "send to the client" + byte[] serializedResponse = translation.serializeResponse(request2.accept(service)); + + // *some transport would normally happen here* + + // Deserialize the response on "the client" + return translation.parseResponse(serializedResponse); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} + +// End LocalProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java new file mode 100644 index 0000000..c070ec0 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.Meta; + +import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.calcite.avatica.remote.MetricsHelper.concat; + +/** + * Implementation of {@link Service} that talks to a local {@link Meta}. + */ +public class LocalService implements Service { + final Meta meta; + final MetricsSystem metrics; + + private final Timer executeTimer; + private final Timer commitTimer; + private final Timer prepareTimer; + private final Timer prepareAndExecuteTimer; + private final Timer connectionSyncTimer; + + private RpcMetadataResponse serverLevelRpcMetadata; + + public LocalService(Meta meta) { + this(meta, NoopMetricsSystem.getInstance()); + } + + public LocalService(Meta meta, MetricsSystem metrics) { + this.meta = meta; + this.metrics = Objects.requireNonNull(metrics); + + this.executeTimer = this.metrics.getTimer(name("Execute")); + this.commitTimer = this.metrics.getTimer(name("Commit")); + this.prepareTimer = this.metrics.getTimer(name("Prepare")); + this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute")); + this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync")); + } + + private static String name(String timer) { + return concat(LocalService.class, timer); + } + + @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) { + this.serverLevelRpcMetadata = Objects.requireNonNull(serverLevelRpcMetadata); + } + + private static <E> List<E> list(Iterable<E> iterable) { + if (iterable instanceof List) { + return (List<E>) iterable; + } + final List<E> rowList = new ArrayList<>(); + for (E row : iterable) { + rowList.add(row); + } + return rowList; + } + + /** Converts a result set (not serializable) into a serializable response. */ + public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) { + if (resultSet.updateCount != -1) { + return new ResultSetResponse(resultSet.connectionId, + resultSet.statementId, resultSet.ownStatement, null, null, + resultSet.updateCount, serverLevelRpcMetadata); + } + + Meta.Signature signature = resultSet.signature; + Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory; + Meta.Frame frame = null; + int updateCount = -1; + final List<Object> list; + + if (resultSet.firstFrame != null) { + list = list(resultSet.firstFrame.rows); + switch (cursorFactory.style) { + case ARRAY: + cursorFactory = Meta.CursorFactory.LIST; + break; + case MAP: + case LIST: + break; + case RECORD: + cursorFactory = Meta.CursorFactory.LIST; + break; + default: + cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames); + } + + final boolean done = resultSet.firstFrame.done; + + frame = new Meta.Frame(0, done, list); + updateCount = -1; + + if (signature.statementType != null) { + if (signature.statementType.canUpdate()) { + frame = null; + updateCount = ((Number) ((List) list.get(0)).get(0)).intValue(); + } + } + } else { + //noinspection unchecked + list = (List<Object>) (List) list2(resultSet); + cursorFactory = Meta.CursorFactory.LIST; + } + + if (cursorFactory != resultSet.signature.cursorFactory) { + signature = signature.setCursorFactory(cursorFactory); + } + + return new ResultSetResponse(resultSet.connectionId, resultSet.statementId, + resultSet.ownStatement, signature, frame, updateCount, serverLevelRpcMetadata); + } + + private List<List<Object>> list2(Meta.MetaResultSet resultSet) { + final Meta.StatementHandle h = new Meta.StatementHandle( + resultSet.connectionId, resultSet.statementId, null); + final List<TypedValue> parameterValues = Collections.emptyList(); + final Iterable<Object> iterable = meta.createIterable(h, null, + resultSet.signature, parameterValues, resultSet.firstFrame); + final List<List<Object>> list = new ArrayList<>(); + return MetaImpl.collect(resultSet.signature.cursorFactory, iterable, list); + } + + public ResultSetResponse apply(CatalogsRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = meta.getCatalogs(ch); + return toResponse(resultSet); + } + + public ResultSetResponse apply(SchemasRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = + meta.getSchemas(ch, request.catalog, Meta.Pat.of(request.schemaPattern)); + return toResponse(resultSet); + } + + public ResultSetResponse apply(TablesRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = + meta.getTables(ch, + request.catalog, + Meta.Pat.of(request.schemaPattern), + Meta.Pat.of(request.tableNamePattern), + request.typeList); + return toResponse(resultSet); + } + + public ResultSetResponse apply(TableTypesRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = meta.getTableTypes(ch); + return toResponse(resultSet); + } + + public ResultSetResponse apply(TypeInfoRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = meta.getTypeInfo(ch); + return toResponse(resultSet); + } + + public ResultSetResponse apply(ColumnsRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.MetaResultSet resultSet = + meta.getColumns(ch, + request.catalog, + Meta.Pat.of(request.schemaPattern), + Meta.Pat.of(request.tableNamePattern), + Meta.Pat.of(request.columnNamePattern)); + return toResponse(resultSet); + } + + public PrepareResponse apply(PrepareRequest request) { + try (final Context ctx = prepareTimer.start()) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.StatementHandle h = + meta.prepare(ch, request.sql, request.maxRowCount); + return new PrepareResponse(h, serverLevelRpcMetadata); + } + } + + public ExecuteResponse apply(PrepareAndExecuteRequest request) { + try (final Context ctx = prepareAndExecuteTimer.start()) { + final Meta.StatementHandle sh = + new Meta.StatementHandle(request.connectionId, request.statementId, null); + try { + final Meta.ExecuteResult executeResult = + meta.prepareAndExecute(sh, request.sql, request.maxRowCount, + new Meta.PrepareCallback() { + @Override public Object getMonitor() { + return LocalService.class; + } + + @Override public void clear() { + } + + @Override public void assign(Meta.Signature signature, + Meta.Frame firstFrame, long updateCount) { + } + + @Override public void execute() { + } + }); + final List<ResultSetResponse> results = new ArrayList<>(); + for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { + results.add(toResponse(metaResultSet)); + } + return new ExecuteResponse(results, false, serverLevelRpcMetadata); + } catch (NoSuchStatementException e) { + // The Statement doesn't exist anymore, bubble up this information + return new ExecuteResponse(null, true, serverLevelRpcMetadata); + } + } + } + + public FetchResponse apply(FetchRequest request) { + final Meta.StatementHandle h = new Meta.StatementHandle( + request.connectionId, request.statementId, null); + try { + final Meta.Frame frame = + meta.fetch(h, + request.offset, + request.fetchMaxRowCount); + return new FetchResponse(frame, false, false, serverLevelRpcMetadata); + } catch (NullPointerException | NoSuchStatementException e) { + // The Statement doesn't exist anymore, bubble up this information + return new FetchResponse(null, true, true, serverLevelRpcMetadata); + } catch (MissingResultsException e) { + return new FetchResponse(null, false, true, serverLevelRpcMetadata); + } + } + + public ExecuteResponse apply(ExecuteRequest request) { + try (final Context ctx = executeTimer.start()) { + try { + final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle, + request.parameterValues, request.maxRowCount); + + final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size()); + for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { + results.add(toResponse(metaResultSet)); + } + return new ExecuteResponse(results, false, serverLevelRpcMetadata); + } catch (NoSuchStatementException e) { + return new ExecuteResponse(null, true, serverLevelRpcMetadata); + } + } + } + + public CreateStatementResponse apply(CreateStatementRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.StatementHandle h = meta.createStatement(ch); + return new CreateStatementResponse(h.connectionId, h.id, serverLevelRpcMetadata); + } + + public CloseStatementResponse apply(CloseStatementRequest request) { + final Meta.StatementHandle h = new Meta.StatementHandle( + request.connectionId, request.statementId, null); + meta.closeStatement(h); + return new CloseStatementResponse(serverLevelRpcMetadata); + } + + public OpenConnectionResponse apply(OpenConnectionRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + meta.openConnection(ch, request.info); + return new OpenConnectionResponse(serverLevelRpcMetadata); + } + + public CloseConnectionResponse apply(CloseConnectionRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + meta.closeConnection(ch); + return new CloseConnectionResponse(serverLevelRpcMetadata); + } + + public ConnectionSyncResponse apply(ConnectionSyncRequest request) { + try (final Context ctx = connectionSyncTimer.start()) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.ConnectionProperties connProps = + meta.connectionSync(ch, request.connProps); + return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata); + } + } + + public DatabasePropertyResponse apply(DatabasePropertyRequest request) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + return new DatabasePropertyResponse(meta.getDatabaseProperties(ch), serverLevelRpcMetadata); + } + + public SyncResultsResponse apply(SyncResultsRequest request) { + final Meta.StatementHandle h = new Meta.StatementHandle( + request.connectionId, request.statementId, null); + SyncResultsResponse response; + try { + // Set success on the cached statement + response = new SyncResultsResponse(meta.syncResults(h, request.state, request.offset), false, + serverLevelRpcMetadata); + } catch (NoSuchStatementException e) { + // Tried to sync results on a statement which wasn't cached + response = new SyncResultsResponse(false, true, serverLevelRpcMetadata); + } + + return response; + } + + public CommitResponse apply(CommitRequest request) { + try (final Context ctx = commitTimer.start()) { + meta.commit(new Meta.ConnectionHandle(request.connectionId)); + + // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception. + return new CommitResponse(); + } + } + + public RollbackResponse apply(RollbackRequest request) { + meta.rollback(new Meta.ConnectionHandle(request.connectionId)); + + // If rollback() errors, let the ErrorResponse be sent back via an uncaught Exception. + return new RollbackResponse(); + } +} + +// End LocalService.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java new file mode 100644 index 0000000..12a5b59 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.proto.Common; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; + +/** + * Identifies an operation from {@link DatabaseMetaData} which returns a {@link ResultSet}. This + * enum is used to allow clients to request the server to re-instantiate a {@link ResultSet} for + * these operations which do not have a SQL string associated with them as a normal query does. + */ +public enum MetaDataOperation { + GET_ATTRIBUTES, + GET_BEST_ROW_IDENTIFIER, + GET_CATALOGS, + GET_CLIENT_INFO_PROPERTIES, + GET_COLUMN_PRIVILEGES, + GET_COLUMNS, + GET_CROSS_REFERENCE, + GET_EXPORTED_KEYS, + GET_FUNCTION_COLUMNS, + GET_FUNCTIONS, + GET_IMPORTED_KEYS, + GET_INDEX_INFO, + GET_PRIMARY_KEYS, + GET_PROCEDURE_COLUMNS, + GET_PROCEDURES, + GET_PSEUDO_COLUMNS, + GET_SCHEMAS, + GET_SCHEMAS_WITH_ARGS, + GET_SUPER_TABLES, + GET_SUPER_TYPES, + GET_TABLE_PRIVILEGES, + GET_TABLES, + GET_TABLE_TYPES, + GET_TYPE_INFO, + GET_UDTS, + GET_VERSION_COLUMNS; + + public Common.MetaDataOperation toProto() { + switch (this) { + case GET_ATTRIBUTES: + return Common.MetaDataOperation.GET_ATTRIBUTES; + case GET_BEST_ROW_IDENTIFIER: + return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER; + case GET_CATALOGS: + return Common.MetaDataOperation.GET_CATALOGS; + case GET_CLIENT_INFO_PROPERTIES: + return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; + case GET_COLUMNS: + return Common.MetaDataOperation.GET_COLUMNS; + case GET_COLUMN_PRIVILEGES: + return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES; + case GET_CROSS_REFERENCE: + return Common.MetaDataOperation.GET_CROSS_REFERENCE; + case GET_EXPORTED_KEYS: + return Common.MetaDataOperation.GET_EXPORTED_KEYS; + case GET_FUNCTIONS: + return Common.MetaDataOperation.GET_FUNCTIONS; + case GET_FUNCTION_COLUMNS: + return Common.MetaDataOperation.GET_FUNCTION_COLUMNS; + case GET_IMPORTED_KEYS: + return Common.MetaDataOperation.GET_IMPORTED_KEYS; + case GET_INDEX_INFO: + return Common.MetaDataOperation.GET_INDEX_INFO; + case GET_PRIMARY_KEYS: + return Common.MetaDataOperation.GET_PRIMARY_KEYS; + case GET_PROCEDURES: + return Common.MetaDataOperation.GET_PROCEDURES; + case GET_PROCEDURE_COLUMNS: + return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS; + case GET_PSEUDO_COLUMNS: + return Common.MetaDataOperation.GET_PSEUDO_COLUMNS; + case GET_SCHEMAS: + return Common.MetaDataOperation.GET_SCHEMAS; + case GET_SCHEMAS_WITH_ARGS: + return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS; + case GET_SUPER_TABLES: + return Common.MetaDataOperation.GET_SUPER_TABLES; + case GET_SUPER_TYPES: + return Common.MetaDataOperation.GET_SUPER_TYPES; + case GET_TABLES: + return Common.MetaDataOperation.GET_TABLES; + case GET_TABLE_PRIVILEGES: + return Common.MetaDataOperation.GET_TABLE_PRIVILEGES; + case GET_TABLE_TYPES: + return Common.MetaDataOperation.GET_TABLE_TYPES; + case GET_TYPE_INFO: + return Common.MetaDataOperation.GET_TYPE_INFO; + case GET_UDTS: + return Common.MetaDataOperation.GET_UDTS; + case GET_VERSION_COLUMNS: + return Common.MetaDataOperation.GET_VERSION_COLUMNS; + default: + throw new RuntimeException("Unknown type: " + this); + } + } + + public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) { + // Null is acceptable + if (null == protoOp) { + return null; + } + + switch (protoOp) { + case GET_ATTRIBUTES: + return MetaDataOperation.GET_ATTRIBUTES; + case GET_BEST_ROW_IDENTIFIER: + return MetaDataOperation.GET_BEST_ROW_IDENTIFIER; + case GET_CATALOGS: + return MetaDataOperation.GET_CATALOGS; + case GET_CLIENT_INFO_PROPERTIES: + return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; + case GET_COLUMNS: + return MetaDataOperation.GET_COLUMNS; + case GET_COLUMN_PRIVILEGES: + return MetaDataOperation.GET_COLUMN_PRIVILEGES; + case GET_CROSS_REFERENCE: + return MetaDataOperation.GET_CROSS_REFERENCE; + case GET_EXPORTED_KEYS: + return MetaDataOperation.GET_EXPORTED_KEYS; + case GET_FUNCTIONS: + return MetaDataOperation.GET_FUNCTIONS; + case GET_FUNCTION_COLUMNS: + return MetaDataOperation.GET_FUNCTION_COLUMNS; + case GET_IMPORTED_KEYS: + return MetaDataOperation.GET_IMPORTED_KEYS; + case GET_INDEX_INFO: + return MetaDataOperation.GET_INDEX_INFO; + case GET_PRIMARY_KEYS: + return MetaDataOperation.GET_PRIMARY_KEYS; + case GET_PROCEDURES: + return MetaDataOperation.GET_PROCEDURES; + case GET_PROCEDURE_COLUMNS: + return MetaDataOperation.GET_PROCEDURE_COLUMNS; + case GET_PSEUDO_COLUMNS: + return MetaDataOperation.GET_PSEUDO_COLUMNS; + case GET_SCHEMAS: + return MetaDataOperation.GET_SCHEMAS; + case GET_SCHEMAS_WITH_ARGS: + return MetaDataOperation.GET_SCHEMAS_WITH_ARGS; + case GET_SUPER_TABLES: + return MetaDataOperation.GET_SUPER_TABLES; + case GET_SUPER_TYPES: + return MetaDataOperation.GET_SUPER_TYPES; + case GET_TABLES: + return MetaDataOperation.GET_TABLES; + case GET_TABLE_PRIVILEGES: + return MetaDataOperation.GET_TABLE_PRIVILEGES; + case GET_TABLE_TYPES: + return MetaDataOperation.GET_TABLE_TYPES; + case GET_TYPE_INFO: + return MetaDataOperation.GET_TYPE_INFO; + case GET_UDTS: + return MetaDataOperation.GET_UDTS; + case GET_VERSION_COLUMNS: + return MetaDataOperation.GET_VERSION_COLUMNS; + default: + throw new RuntimeException("Unknown type: " + protoOp); + } + } +} + +// End MetaDataOperation.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java new file mode 100644 index 0000000..2561b29 --- /dev/null +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +/** + * A utility class to encapsulate common logic in use of metrics implementation. + */ +public class MetricsHelper { + + private static final String PERIOD = "."; + + private MetricsHelper() {} + + public static String concat(Class<?> clz, String name) { + StringBuilder sb = new StringBuilder(); + sb.append(clz.getName()); + return sb.append(PERIOD).append(name).toString(); + } + +} + +// End MetricsHelper.java
