This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 045abc1 [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr… 045abc1 is described below commit 045abc17734cd3fdcd40c1317cca3db980ea0095 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Tue Apr 14 08:44:19 2020 -0700 [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr… Closes #2951 from sv2000/httpRetryHandler --- .../gobblin/configuration/ConfigurationKeys.java | 12 ++++- .../schemareg/GobblinHttpMethodRetryHandler.java | 62 ++++++++++++++++++++++ .../gobblin/kafka/schemareg/HttpClientFactory.java | 21 +++++++- .../metrics/kafka/KafkaAvroSchemaRegistry.java | 20 +++++-- .../GobblinHttpMethodRetryHandlerTest.java | 55 +++++++++++++++++++ .../kafka/schemareg/HttpClientFactoryTest.java | 33 ++++++++++++ 6 files changed, 197 insertions(+), 6 deletions(-) diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 6075725..53bbe3c 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -824,12 +824,22 @@ public class ConfigurationKeys { public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig"; /** - * Kafka schema registry + * Kafka schema registry HTTP client configuration */ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT = "kafka.schema.registry.httpclient.so.timeout"; public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT = "kafka.schema.registry.httpclient.conn.timeout"; + public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT = + "kafka.schema.registry.httpclient.methodRetryCount"; + public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED = + "kafka.schema.registry.httpclient.requestRetryEnabled"; + public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS = + "kafka.schema.registry.httpclient.methodRetryHandlerClass"; + + /** + * Kafka schema registry retry configurations + */ public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = "kafka.schema.registry.retry.times"; public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS = "kafka.schema.registry.retry.interval.inMillis"; diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java new file mode 100644 index 0000000..13ddd27 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.kafka.schemareg; + +import java.io.IOException; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; + +import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; +import org.apache.commons.httpclient.HttpMethod; + +import org.apache.gobblin.annotation.Alias; + +/** + * An extension of {@link DefaultHttpMethodRetryHandler} that retries the HTTP request on network errors such as + * {@link java.net.UnknownHostException} and {@link java.net.NoRouteToHostException}. + */ +@Alias (value = "gobblinhttpretryhandler") +public class GobblinHttpMethodRetryHandler extends DefaultHttpMethodRetryHandler { + + public GobblinHttpMethodRetryHandler() { + this(3, false); + } + + public GobblinHttpMethodRetryHandler(int retryCount, boolean requestSentRetryEnabled) { + super(retryCount, requestSentRetryEnabled); + } + + @Override + public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) { + if (method == null) { + throw new IllegalArgumentException("HTTP method may not be null"); + } + if (exception == null) { + throw new IllegalArgumentException("Exception parameter may not be null"); + } + if (executionCount > super.getRetryCount()) { + // Do not retry if over max retry count + return false; + } + //Override the behavior of DefaultHttpMethodRetryHandler to retry in case of UnknownHostException + // and NoRouteToHostException. + if (exception instanceof UnknownHostException || exception instanceof NoRouteToHostException) { + return true; + } + return super.retryMethod(method, exception, executionCount); + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java index 028dcf8..4fd0a90 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java @@ -17,13 +17,19 @@ package org.apache.gobblin.kafka.schemareg; +import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethodRetryHandler; +import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import lombok.Setter; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + /** * An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}. @@ -34,18 +40,29 @@ public class HttpClientFactory extends BasePooledObjectFactory<HttpClient>{ @Setter private int soTimeout = -1; @Setter private int connTimeout = -1; + @Setter private int httpMethodRetryCount = 3; + @Setter private boolean httpRequestSentRetryEnabled = false; + @Setter private String httpMethodRetryHandlerClass = DefaultHttpMethodRetryHandler.class.getName(); public HttpClientFactory() { } @Override - public HttpClient create() throws Exception { - + public HttpClient create() { HttpClient client = new HttpClient(); if (soTimeout >= 0) { client.getParams().setSoTimeout(soTimeout); } + ClassAliasResolver<HttpMethodRetryHandler> aliasResolver = new ClassAliasResolver<>(HttpMethodRetryHandler.class); + HttpMethodRetryHandler httpMethodRetryHandler; + try { + httpMethodRetryHandler = GobblinConstructorUtils.invokeLongestConstructor(aliasResolver.resolveClass(httpMethodRetryHandlerClass), httpMethodRetryCount, httpRequestSentRetryEnabled); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, httpMethodRetryHandler); + if (connTimeout >= 0) { client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout); } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java index c31c7c8..556c1d1 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java @@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.schemareg.HttpClientFactory; import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.apache.gobblin.util.AvroUtils; -import lombok.extern.slf4j.Slf4j; - /** * An implementation of {@link KafkaSchemaRegistry}. @@ -68,7 +68,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> private final Optional<Map<String, String>> namespaceOverride; /** - * @param properties properties should contain property "kafka.schema.registry.url", and optionally + * @param props properties should contain property "kafka.schema.registry.url", and optionally * "kafka.schema.registry.max.cache.size" (default = 1000) and * "kafka.schema.registry.cache.expire.after.write.min" (default = 10). */ @@ -101,6 +101,20 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> factory.setConnTimeout(Integer.parseInt(connTimeout)); } + if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT)) { + String retryCount = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT); + factory.setHttpMethodRetryCount(Integer.parseInt(retryCount)); + } + + if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED)) { + String requestRetryEnabled = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED); + factory.setHttpRequestSentRetryEnabled(Boolean.parseBoolean(requestRetryEnabled)); + } + + if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS)) { + String httpMethodRetryHandlerClass = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS); + factory.setHttpMethodRetryHandlerClass(httpMethodRetryHandlerClass); + } this.httpClientPool = new GenericObjectPool<>(factory, config); } diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java new file mode 100644 index 0000000..b0fe997 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.kafka.schemareg; + +import java.io.IOException; +import java.net.UnknownHostException; + +import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; +import org.apache.commons.httpclient.HttpMethod; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class GobblinHttpMethodRetryHandlerTest { + + @Test + public void testRetryMethod() { + GobblinHttpMethodRetryHandler gobblinHttpMethodRetryHandler = new GobblinHttpMethodRetryHandler(1, false); + HttpMethod mockHttpMethod = Mockito.mock(HttpMethod.class); + + //GobblinHttpHandler.retryMethod should return true on UnknownHostException + Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 0)); + Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 1)); + //Return false when the retry count is exceeded + Assert.assertFalse(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 2)); + + //Ensure the GobblinHttpMethodRetryHandler has the same behavior as the DefaultHttpMethodRetryHandler for a normal + //IOException + DefaultHttpMethodRetryHandler defaultHttpMethodRetryHandler = new DefaultHttpMethodRetryHandler(1, false); + boolean shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0); + boolean shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0); + Assert.assertTrue(shouldRetryWithGobblinRetryHandler); + Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler); + + shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2); + shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2); + Assert.assertFalse(shouldRetryWithGobblinRetryHandler); + Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler); + } +} \ No newline at end of file diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java new file mode 100644 index 0000000..4d080f4 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.kafka.schemareg; + +import org.apache.commons.httpclient.HttpClient; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class HttpClientFactoryTest { + + @Test + public void testCreate() { + HttpClientFactory httpClientFactory = new HttpClientFactory(); + httpClientFactory.setHttpMethodRetryHandlerClass(GobblinHttpMethodRetryHandler.class.getName()); + HttpClient client = httpClientFactory.create(); + Assert.assertNotNull(client); + } +} \ No newline at end of file