Repository: calcite Updated Branches: refs/heads/master 240cee445 -> 3b5d88e6a
[CALCITE-1300] Retry on HTTP-503 in hc-based AvaticaHttpClient Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/3b5d88e6 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/3b5d88e6 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/3b5d88e6 Branch: refs/heads/master Commit: 3b5d88e6a717600f2c7c282ae7319875866017ea Parents: 240cee4 Author: Josh Elser <[email protected]> Authored: Fri Jun 24 16:47:44 2016 -0400 Committer: Josh Elser <[email protected]> Committed: Fri Jun 24 17:13:48 2016 -0400 ---------------------------------------------------------------------- .../remote/AvaticaCommonsHttpClientImpl.java | 69 ++++++++++------- .../AvaticaCommonsHttpClientImplTest.java | 79 ++++++++++++++++++++ 2 files changed, 120 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/3b5d88e6/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java index ffb20a7..8cd5340 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java @@ -20,6 +20,7 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.ClientProtocolException; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.methods.CloseableHttpResponse; @@ -48,6 +49,7 @@ import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -70,7 +72,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "100"; protected final HttpHost host; - protected final URL url; + protected final URI uri; protected final HttpProcessor httpProcessor; protected final HttpRequestExecutor httpExecutor; protected final BasicAuthCache authCache; @@ -83,7 +85,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, public AvaticaCommonsHttpClientImpl(URL url) { this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol()); - this.url = Objects.requireNonNull(url); + this.uri = toURI(Objects.requireNonNull(url)); this.httpProcessor = HttpProcessorBuilder.create() .add(new RequestContent()) @@ -111,39 +113,50 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, } public byte[] send(byte[] request) { - HttpClientContext context = HttpClientContext.create(); + while (true) { + HttpClientContext context = HttpClientContext.create(); - context.setTargetHost(host); + context.setTargetHost(host); - // Set the credentials if they were provided. - if (null != this.credentials) { - context.setCredentialsProvider(credentialsProvider); - context.setAuthSchemeRegistry(authRegistry); - context.setAuthCache(authCache); - } - - ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM); - - // Create the client with the AuthSchemeRegistry and manager - HttpPost post = new HttpPost(toURI(url)); - post.setEntity(entity); - - try (CloseableHttpResponse response = client.execute(post, context)) { - final int statusCode = response.getStatusLine().getStatusCode(); - if (HttpURLConnection.HTTP_OK == statusCode - || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) { - return EntityUtils.toByteArray(response.getEntity()); + // Set the credentials if they were provided. + if (null != this.credentials) { + context.setCredentialsProvider(credentialsProvider); + context.setAuthSchemeRegistry(authRegistry); + context.setAuthCache(authCache); } - throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - LOG.debug("Failed to execute HTTP request", e); - throw new RuntimeException(e); + ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM); + + // Create the client with the AuthSchemeRegistry and manager + HttpPost post = new HttpPost(uri); + post.setEntity(entity); + + try (CloseableHttpResponse response = execute(post, context)) { + final int statusCode = response.getStatusLine().getStatusCode(); + if (HttpURLConnection.HTTP_OK == statusCode + || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) { + return EntityUtils.toByteArray(response.getEntity()); + } else if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) { + LOG.debug("Failed to connect to server (HTTP/503), retrying"); + continue; + } + + throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.debug("Failed to execute HTTP request", e); + throw new RuntimeException(e); + } } } + // Visible for testing + CloseableHttpResponse execute(HttpPost post, HttpClientContext context) + throws IOException, ClientProtocolException { + return client.execute(post, context); + } + @Override public void setUsernamePassword(AuthenticationType authType, String username, String password) { this.credentials = new UsernamePasswordCredentials( http://git-wip-us.apache.org/repos/asf/calcite/blob/3b5d88e6/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java new file mode 100644 index 0000000..b7c49a3 --- /dev/null +++ b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.entity.StringEntity; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.net.HttpURLConnection; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Test class for {@link AvaticaCommonsHttpClientImpl} + */ +public class AvaticaCommonsHttpClientImplTest { + + @Test public void testRetryOnHttp503() throws Exception { + final byte[] requestBytes = "fake_request".getBytes(UTF_8); + final CloseableHttpResponse badResponse = mock(CloseableHttpResponse.class); + final CloseableHttpResponse goodResponse = mock(CloseableHttpResponse.class); + final StatusLine badStatusLine = mock(StatusLine.class); + final StatusLine goodStatusLine = mock(StatusLine.class); + final StringEntity responseEntity = new StringEntity("success"); + final Answer<CloseableHttpResponse> failThenSucceed = new Answer<CloseableHttpResponse>() { + private int iteration = 0; + @Override public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable { + iteration++; + if (1 == iteration) { + return badResponse; + } else { + return goodResponse; + } + } + }; + + final AvaticaCommonsHttpClientImpl client = mock(AvaticaCommonsHttpClientImpl.class); + + when(client.send(any(byte[].class))).thenCallRealMethod(); + when(client.execute(any(HttpPost.class), any(HttpClientContext.class))).then(failThenSucceed); + + when(badResponse.getStatusLine()).thenReturn(badStatusLine); + when(badStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE); + + when(goodResponse.getStatusLine()).thenReturn(goodStatusLine); + when(goodStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_OK); + when(goodResponse.getEntity()).thenReturn(responseEntity); + + byte[] responseBytes = client.send(requestBytes); + assertEquals("success", new String(responseBytes, UTF_8)); + } +} + +// End AvaticaCommonsHttpClientImplTest.java
