This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 045abc1  [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that 
allows retr…
045abc1 is described below

commit 045abc17734cd3fdcd40c1317cca3db980ea0095
Author: sv2000 <sudarsh...@gmail.com>
AuthorDate: Tue Apr 14 08:44:19 2020 -0700

    [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
    
    Closes #2951 from sv2000/httpRetryHandler
---
 .../gobblin/configuration/ConfigurationKeys.java   | 12 ++++-
 .../schemareg/GobblinHttpMethodRetryHandler.java   | 62 ++++++++++++++++++++++
 .../gobblin/kafka/schemareg/HttpClientFactory.java | 21 +++++++-
 .../metrics/kafka/KafkaAvroSchemaRegistry.java     | 20 +++++--
 .../GobblinHttpMethodRetryHandlerTest.java         | 55 +++++++++++++++++++
 .../kafka/schemareg/HttpClientFactoryTest.java     | 33 ++++++++++++
 6 files changed, 197 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6075725..53bbe3c 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -824,12 +824,22 @@ public class ConfigurationKeys {
   public static final String SHARED_KAFKA_CONFIG_PREFIX = 
"gobblin.kafka.sharedConfig";
 
   /**
-   * Kafka schema registry
+   * Kafka schema registry HTTP client configuration
    */
   public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
       "kafka.schema.registry.httpclient.so.timeout";
   public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
       "kafka.schema.registry.httpclient.conn.timeout";
+  public static final String 
KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT =
+      "kafka.schema.registry.httpclient.methodRetryCount";
+  public static final String 
KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED =
+      "kafka.schema.registry.httpclient.requestRetryEnabled";
+  public static final String 
KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS =
+      "kafka.schema.registry.httpclient.methodRetryHandlerClass";
+
+  /**
+   * Kafka schema registry retry configurations
+   */
   public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = 
"kafka.schema.registry.retry.times";
   public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
       "kafka.schema.registry.retry.interval.inMillis";
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
new file mode 100644
index 0000000..13ddd27
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+
+import org.apache.gobblin.annotation.Alias;
+
+/**
+ * An extension of {@link DefaultHttpMethodRetryHandler} that retries the HTTP 
request on network errors such as
+ * {@link java.net.UnknownHostException} and {@link 
java.net.NoRouteToHostException}.
+ */
+@Alias (value = "gobblinhttpretryhandler")
+public class GobblinHttpMethodRetryHandler extends 
DefaultHttpMethodRetryHandler {
+
+  public GobblinHttpMethodRetryHandler() {
+    this(3, false);
+  }
+
+  public GobblinHttpMethodRetryHandler(int retryCount, boolean 
requestSentRetryEnabled) {
+    super(retryCount, requestSentRetryEnabled);
+  }
+
+  @Override
+  public boolean retryMethod(final HttpMethod method, final IOException 
exception, int executionCount) {
+    if (method == null) {
+      throw new IllegalArgumentException("HTTP method may not be null");
+    }
+    if (exception == null) {
+      throw new IllegalArgumentException("Exception parameter may not be 
null");
+    }
+    if (executionCount > super.getRetryCount()) {
+      // Do not retry if over max retry count
+      return false;
+    }
+    //Override the behavior of DefaultHttpMethodRetryHandler to retry in case 
of UnknownHostException
+    // and NoRouteToHostException.
+    if (exception instanceof UnknownHostException || exception instanceof 
NoRouteToHostException) {
+      return true;
+    }
+    return super.retryMethod(method, exception, executionCount);
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
index 028dcf8..4fd0a90 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
@@ -17,13 +17,19 @@
 
 package org.apache.gobblin.kafka.schemareg;
 
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 
 import lombok.Setter;
 
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
 
 /**
  * An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}.
@@ -34,18 +40,29 @@ public class HttpClientFactory extends 
BasePooledObjectFactory<HttpClient>{
 
   @Setter private int soTimeout = -1;
   @Setter private int connTimeout = -1;
+  @Setter private int httpMethodRetryCount = 3;
+  @Setter private boolean httpRequestSentRetryEnabled = false;
+  @Setter private String httpMethodRetryHandlerClass = 
DefaultHttpMethodRetryHandler.class.getName();
 
   public HttpClientFactory() {
   }
 
   @Override
-  public HttpClient create() throws Exception {
-
+  public HttpClient create() {
     HttpClient client = new HttpClient();
     if (soTimeout >= 0) {
       client.getParams().setSoTimeout(soTimeout);
     }
 
+    ClassAliasResolver<HttpMethodRetryHandler> aliasResolver = new 
ClassAliasResolver<>(HttpMethodRetryHandler.class);
+    HttpMethodRetryHandler httpMethodRetryHandler;
+    try {
+      httpMethodRetryHandler = 
GobblinConstructorUtils.invokeLongestConstructor(aliasResolver.resolveClass(httpMethodRetryHandlerClass),
 httpMethodRetryCount, httpRequestSentRetryEnabled);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+    client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, 
httpMethodRetryHandler);
+
     if (connTimeout >= 0) {
       
client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout);
     }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index c31c7c8..556c1d1 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
 import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.apache.gobblin.util.AvroUtils;
 
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * An implementation of {@link KafkaSchemaRegistry}.
@@ -68,7 +68,7 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
   private final Optional<Map<String, String>> namespaceOverride;
 
   /**
-   * @param properties properties should contain property 
"kafka.schema.registry.url", and optionally
+   * @param props properties should contain property 
"kafka.schema.registry.url", and optionally
    * "kafka.schema.registry.max.cache.size" (default = 1000) and
    * "kafka.schema.registry.cache.expire.after.write.min" (default = 10).
    */
@@ -101,6 +101,20 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
       factory.setConnTimeout(Integer.parseInt(connTimeout));
     }
 
+    if 
(this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT))
 {
+      String retryCount = 
this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT);
+      factory.setHttpMethodRetryCount(Integer.parseInt(retryCount));
+    }
+
+    if 
(this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED))
 {
+      String requestRetryEnabled = 
this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED);
+      
factory.setHttpRequestSentRetryEnabled(Boolean.parseBoolean(requestRetryEnabled));
+    }
+
+    if 
(this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS))
 {
+      String httpMethodRetryHandlerClass = 
this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS);
+      factory.setHttpMethodRetryHandlerClass(httpMethodRetryHandlerClass);
+    }
     this.httpClientPool = new GenericObjectPool<>(factory, config);
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
new file mode 100644
index 0000000..b0fe997
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GobblinHttpMethodRetryHandlerTest {
+
+  @Test
+  public void testRetryMethod() {
+    GobblinHttpMethodRetryHandler gobblinHttpMethodRetryHandler = new 
GobblinHttpMethodRetryHandler(1, false);
+    HttpMethod mockHttpMethod = Mockito.mock(HttpMethod.class);
+
+    //GobblinHttpHandler.retryMethod should return true on UnknownHostException
+    
Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
UnknownHostException("dummyException"), 0));
+    
Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
UnknownHostException("dummyException"), 1));
+    //Return false when the retry count is exceeded
+    
Assert.assertFalse(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, 
new UnknownHostException("dummyException"), 2));
+
+    //Ensure the GobblinHttpMethodRetryHandler has the same behavior as the 
DefaultHttpMethodRetryHandler for a normal
+    //IOException
+    DefaultHttpMethodRetryHandler defaultHttpMethodRetryHandler = new 
DefaultHttpMethodRetryHandler(1, false);
+    boolean shouldRetryWithGobblinRetryHandler = 
gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
IOException("dummyException"), 0);
+    boolean shouldRetryWithDefaultRetryHandler = 
defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
IOException("dummyException"), 0);
+    Assert.assertTrue(shouldRetryWithGobblinRetryHandler);
+    Assert.assertEquals(shouldRetryWithDefaultRetryHandler, 
shouldRetryWithGobblinRetryHandler);
+
+    shouldRetryWithGobblinRetryHandler = 
gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
IOException("dummyException"), 2);
+    shouldRetryWithDefaultRetryHandler = 
defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new 
IOException("dummyException"), 2);
+    Assert.assertFalse(shouldRetryWithGobblinRetryHandler);
+    Assert.assertEquals(shouldRetryWithDefaultRetryHandler, 
shouldRetryWithGobblinRetryHandler);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
new file mode 100644
index 0000000..4d080f4
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HttpClientFactoryTest {
+
+  @Test
+  public void testCreate() {
+    HttpClientFactory httpClientFactory = new HttpClientFactory();
+    
httpClientFactory.setHttpMethodRetryHandlerClass(GobblinHttpMethodRetryHandler.class.getName());
+    HttpClient client = httpClientFactory.create();
+    Assert.assertNotNull(client);
+  }
+}
\ No newline at end of file

Reply via email to