Repository: calcite Updated Branches: refs/heads/master d27e642c4 -> ec3d8d37c
[CALCITE-1117] Default to a commons-httpclient implementation Closes apache/calcite#197 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ec3d8d37 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ec3d8d37 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ec3d8d37 Branch: refs/heads/master Commit: ec3d8d37c9d38a8f041c2b1b3aa68711c085b939 Parents: d27e642 Author: Josh Elser <[email protected]> Authored: Wed Mar 2 17:43:53 2016 -0500 Committer: Josh Elser <[email protected]> Committed: Thu Mar 3 17:27:08 2016 -0500 ---------------------------------------------------------------------- avatica/pom.xml | 16 +++ .../avatica/BuiltInConnectionProperty.java | 11 +- .../calcite/avatica/ConnectionConfig.java | 3 + .../calcite/avatica/ConnectionConfigImpl.java | 10 ++ .../remote/AvaticaCommonsHttpClientImpl.java | 133 +++++++++++++++++++ .../remote/AvaticaHttpClientFactory.java | 39 ++++++ .../remote/AvaticaHttpClientFactoryImpl.java | 66 +++++++++ .../apache/calcite/avatica/remote/Driver.java | 4 +- .../remote/AvaticaHttpClientFactoryTest.java | 60 +++++++++ pom.xml | 12 ++ 10 files changed, 352 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/pom.xml ---------------------------------------------------------------------- diff --git a/avatica/pom.xml b/avatica/pom.xml index cf7d875..3027df0 100644 --- a/avatica/pom.xml +++ b/avatica/pom.xml @@ -56,6 +56,14 @@ limitations under the License. <artifactId>protobuf-java</artifactId> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -187,6 +195,14 @@ limitations under the License. <pattern>com.google.protobuf</pattern> <shadedPattern>org.apache.calcite.avatica.com.google.protobuf</shadedPattern> </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.calcite.avatica.org.apache.http</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons</pattern> + <shadedPattern>org.apache.calcite.avatica.org.apache.commons</shadedPattern> + </relocation> </relocations> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java index 28a66bb..086ae4a 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.remote.AvaticaHttpClientFactoryImpl; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -40,7 +42,14 @@ public enum BuiltInConnectionProperty implements ConnectionProperty { URL("url", Type.STRING, null, false), /** Serialization used over remote connections */ - SERIALIZATION("serialization", Type.STRING, "json", false); + SERIALIZATION("serialization", Type.STRING, "json", false), + + /** Factory for constructing http clients. */ + HTTP_CLIENT_FACTORY("httpclient_factory", Type.PLUGIN, + AvaticaHttpClientFactoryImpl.class.getName(), false), + + /** HttpClient implementation class name. */ + HTTP_CLIENT_IMPL("httpclient_impl", Type.STRING, null, false); private final String camelName; private final Type type; http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java index 6c7633f..8e4790c 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory; import org.apache.calcite.avatica.remote.Service; /** @@ -32,6 +33,8 @@ public interface ConnectionConfig { String url(); /** @see BuiltInConnectionProperty#SERIALIZATION */ String serialization(); + AvaticaHttpClientFactory httpClientFactory(); + String httpClientClass(); } // End ConnectionConfig.java http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java index 8b94e03..bbeefea 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory; import org.apache.calcite.avatica.remote.Service; import java.util.LinkedHashMap; @@ -51,6 +52,15 @@ public class ConnectionConfigImpl implements ConnectionConfig { return BuiltInConnectionProperty.SERIALIZATION.wrap(properties).getString(); } + public AvaticaHttpClientFactory httpClientFactory() { + return BuiltInConnectionProperty.HTTP_CLIENT_FACTORY.wrap(properties) + .getPlugin(AvaticaHttpClientFactory.class, null); + } + + public String httpClientClass() { + return BuiltInConnectionProperty.HTTP_CLIENT_IMPL.wrap(properties).getString(); + } + /** Converts a {@link Properties} object containing (name, value) * pairs into a map whose keys are * {@link org.apache.calcite.avatica.InternalProperty} objects. http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java new file mode 100644 index 0000000..3bda248 --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.http.ConnectionReuseStrategy; +import org.apache.http.HttpClientConnection; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.client.protocol.RequestExpectContinue; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.pool.BasicConnFactory; +import org.apache.http.impl.pool.BasicConnPool; +import org.apache.http.impl.pool.BasicPoolEntry; +import org.apache.http.message.BasicHttpEntityEnclosingRequest; +import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.protocol.HttpProcessor; +import org.apache.http.protocol.HttpProcessorBuilder; +import org.apache.http.protocol.HttpRequestExecutor; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.http.util.EntityUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.concurrent.Future; + +/** + * A common class to invoke HTTP requests against the Avatica server agnostic of the data being + * sent and received across the wire. + */ +public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient { + private static final Logger LOG = LoggerFactory.getLogger(AvaticaCommonsHttpClientImpl.class); + private static final ConnectionReuseStrategy REUSE = DefaultConnectionReuseStrategy.INSTANCE; + + // Some basic exposed configurations + private static final String MAX_POOLED_CONNECTION_PER_ROUTE_KEY = + "avatica.pooled.connections.per.route"; + private static final String MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT = "4"; + private static final String MAX_POOLED_CONNECTIONS_KEY = "avatica.pooled.connections.max"; + private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "16"; + + protected final HttpHost host; + protected final HttpProcessor httpProcessor; + protected final HttpRequestExecutor httpExecutor; + protected final BasicConnPool httpPool; + + public AvaticaCommonsHttpClientImpl(URL url) { + this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol()); + + this.httpProcessor = HttpProcessorBuilder.create() + .add(new RequestContent()) + .add(new RequestTargetHost()) + .add(new RequestConnControl()) + .add(new RequestExpectContinue()).build(); + + this.httpExecutor = new HttpRequestExecutor(); + + this.httpPool = new BasicConnPool(new BasicConnFactory()); + int maxPerRoute = Integer.parseInt( + System.getProperty(MAX_POOLED_CONNECTION_PER_ROUTE_KEY, + MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT)); + int maxTotal = Integer.parseInt( + System.getProperty(MAX_POOLED_CONNECTIONS_KEY, + MAX_POOLED_CONNECTIONS_DEFAULT)); + httpPool.setDefaultMaxPerRoute(maxPerRoute); + httpPool.setMaxTotal(maxTotal); + } + + public byte[] send(byte[] request) { + while (true) { + boolean reusable = false; + // Get a connection from the pool + Future<BasicPoolEntry> future = this.httpPool.lease(host, null); + BasicPoolEntry entry = null; + try { + entry = future.get(); + HttpCoreContext coreContext = HttpCoreContext.create(); + coreContext.setTargetHost(host); + + HttpClientConnection conn = entry.getConnection(); + + ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM); + + BasicHttpEntityEnclosingRequest postRequest = + new BasicHttpEntityEnclosingRequest("POST", "/"); + postRequest.setEntity(entity); + + httpExecutor.preProcess(postRequest, httpProcessor, coreContext); + HttpResponse response = httpExecutor.execute(postRequest, conn, coreContext); + httpExecutor.postProcess(response, httpProcessor, coreContext); + + // Should the connection be kept alive? + reusable = REUSE.keepAlive(response, coreContext); + + final int statusCode = response.getStatusLine().getStatusCode(); + if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) { + // Could be sitting behind a load-balancer, try again. + continue; + } + + return EntityUtils.toByteArray(response.getEntity()); + } catch (Exception e) { + LOG.debug("Failed to execute HTTP request", e); + throw new RuntimeException(e); + } finally { + // Release the connection back to the pool, marking if it's good to reuse or not. + httpPool.release(entry, reusable); + } + } + } +} + +// End AvaticaCommonsHttpClientImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java new file mode 100644 index 0000000..b5d213a --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfig; + +import java.net.URL; + +/** + * A factory for constructing {@link AvaticaHttpClient}'s. + */ +public interface AvaticaHttpClientFactory { + + /** + * Construct the appropriate implementation of {@link AvaticaHttpClient}. + * + * @param url URL that the client is for. + * @param config Configuration to use when constructing the implementation. + * @return An instance of {@link AvaticaHttpClient}. + */ + AvaticaHttpClient getClient(URL url, ConnectionConfig config); + +} + +// End AvaticaHttpClientFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java new file mode 100644 index 0000000..cd6cbce --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfig; + +import java.lang.reflect.Constructor; +import java.net.URL; +import java.util.Objects; + +/** + * Default implementation of {@link AvaticaHttpClientFactory} which chooses an implementation + * from a property. + */ +public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory { + public static final String HTTP_CLIENT_IMPL_DEFAULT = + AvaticaCommonsHttpClientImpl.class.getName(); + + // Public for Type.PLUGIN + public static final AvaticaHttpClientFactoryImpl INSTANCE = new AvaticaHttpClientFactoryImpl(); + + // Public for Type.PLUGIN + public AvaticaHttpClientFactoryImpl() {} + + /** + * Returns a singleton instance of {@link AvaticaHttpClientFactoryImpl}. + * + * @return A singleton instance. + */ + public static AvaticaHttpClientFactoryImpl getInstance() { + return INSTANCE; + } + + @Override public AvaticaHttpClient getClient(URL url, ConnectionConfig config) { + String className = config.httpClientClass(); + if (null == className) { + className = HTTP_CLIENT_IMPL_DEFAULT; + } + + try { + Class<?> clz = Class.forName(className); + Constructor<?> constructor = clz.getConstructor(URL.class); + Object instance = constructor.newInstance(Objects.requireNonNull(url)); + return AvaticaHttpClient.class.cast(instance); + } catch (Exception e) { + throw new RuntimeException("Failed to construct AvaticaHttpClient implementation " + + className, e); + } + } +} + +// End AvaticaHttpClientFactoryImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java index 752e18d..e98c486 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java @@ -135,7 +135,9 @@ public class Driver extends UnregisteredDriver { throw new RuntimeException(e); } - return new AvaticaHttpClientImpl(url); + AvaticaHttpClientFactory httpClientFactory = config.httpClientFactory(); + + return httpClientFactory.getClient(url, config); } @Override public Connection connect(String url, Properties info) http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryTest.java new file mode 100644 index 0000000..cd64329 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.BuiltInConnectionProperty; +import org.apache.calcite.avatica.ConnectionConfig; +import org.apache.calcite.avatica.ConnectionConfigImpl; + +import org.junit.Test; + +import java.net.URL; +import java.util.Properties; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for the factory that creates http clients. + */ +public class AvaticaHttpClientFactoryTest { + + @Test public void testDefaultHttpClient() throws Exception { + Properties props = new Properties(); + URL url = new URL("http://localhost:8765"); + ConnectionConfig config = new ConnectionConfigImpl(props); + AvaticaHttpClientFactory httpClientFactory = new AvaticaHttpClientFactoryImpl(); + + AvaticaHttpClient client = httpClientFactory.getClient(url, config); + assertTrue("Client was an instance of " + client.getClass(), + client instanceof AvaticaCommonsHttpClientImpl); + } + + @Test public void testOverridenHttpClient() throws Exception { + Properties props = new Properties(); + props.setProperty(BuiltInConnectionProperty.HTTP_CLIENT_IMPL.name(), + AvaticaHttpClientImpl.class.getName()); + URL url = new URL("http://localhost:8765"); + ConnectionConfig config = new ConnectionConfigImpl(props); + AvaticaHttpClientFactory httpClientFactory = new AvaticaHttpClientFactoryImpl(); + + AvaticaHttpClient client = httpClientFactory.getClient(url, config); + assertTrue("Client was an instance of " + client.getClass(), + client instanceof AvaticaHttpClientImpl); + } +} + +// End AvaticaHttpClientFactoryTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/ec3d8d37/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index fecb93e..365cb6e 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,8 @@ limitations under the License. <hadoop.version>2.6.0</hadoop.version> <hamcrest.version>1.3</hamcrest.version> <hsqldb.version>2.3.1</hsqldb.version> + <httpclient.version>4.5.2</httpclient.version> + <httpcore.version>4.4.4</httpcore.version> <hydromatic-resource.version>0.5.1</hydromatic-resource.version> <hydromatic-toolbox.version>0.3</hydromatic-toolbox.version> <hydromatic-tpcds.version>0.4</hydromatic-tpcds.version> @@ -306,6 +308,16 @@ limitations under the License. <version>${hadoop.version}</version> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>${httpcore.version}</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>${mockito-all.version}</version>
