Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5043c537c -> 760da9a3e


[GOBBLIN-356] Add socket and connection timeout for kafka schema registry

Closes #2228 from yukuai518/schemaregistry


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/760da9a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/760da9a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/760da9a3

Branch: refs/heads/master
Commit: 760da9a3e557f76fd84a65411c92cdb4615ca81f
Parents: 5043c53
Author: Kuai Yu <[email protected]>
Authored: Fri Jan 5 21:51:18 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Fri Jan 5 21:51:18 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  8 ++++
 .../kafka/schemareg/HttpClientFactory.java      | 20 ++++++++-
 .../metrics/kafka/KafkaAvroSchemaRegistry.java  | 45 +++++++++++++++++---
 3 files changed, 66 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3e576ce..ed360d9 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -743,6 +743,14 @@ public class ConfigurationKeys {
   public static final String SHARED_KAFKA_CONFIG_PREFIX = 
"gobblin.kafka.sharedConfig";
 
   /**
+   * Kafka schema registry
+   */
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT = 
"kafka.schema.registry.httpclient.so.timeout";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT = 
"kafka.schema.registry.httpclient.conn.timeout";
+  public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = 
"kafka.schema.registry.retry.times";
+  public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS = 
"kafka.schema.registry.retry.interval.inMillis";
+
+  /**
    * Job execution info server and history store configuration properties.
    */
   // If job execution info server is enabled

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
index 8c7b3a0..028dcf8 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
@@ -22,6 +22,8 @@ import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 
+import lombok.Setter;
+
 
 /**
  * An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}.
@@ -30,9 +32,25 @@ import org.apache.commons.pool2.impl.DefaultPooledObject;
  */
 public class HttpClientFactory extends BasePooledObjectFactory<HttpClient>{
 
+  @Setter private int soTimeout = -1;
+  @Setter private int connTimeout = -1;
+
+  public HttpClientFactory() {
+  }
+
   @Override
   public HttpClient create() throws Exception {
-    return new HttpClient();
+
+    HttpClient client = new HttpClient();
+    if (soTimeout >= 0) {
+      client.getParams().setSoTimeout(soTimeout);
+    }
+
+    if (connTimeout >= 0) {
+      
client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout);
+    }
+
+    return client;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 6162636..2a216f2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -42,12 +42,15 @@ import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
 import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.apache.gobblin.util.AvroUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 
 /**
  * An implementation of {@link KafkaSchemaRegistry}.
  *
  * @author Ziyang Liu
  */
+@Slf4j
 public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, 
Schema> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaAvroSchemaRegistry.class);
@@ -85,7 +88,20 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
     GenericObjectPoolConfig config = new GenericObjectPoolConfig();
     config.setMaxTotal(objPoolSize);
     config.setMaxIdle(objPoolSize);
-    this.httpClientPool = new GenericObjectPool<>(new HttpClientFactory(), 
config);
+
+    HttpClientFactory factory = new HttpClientFactory();
+
+    if 
(this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT))
 {
+      String soTimeout = 
this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT);
+      factory.setSoTimeout(Integer.parseInt(soTimeout));
+    }
+
+    if 
(this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT))
 {
+      String connTimeout = 
this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT);
+      factory.setConnTimeout(Integer.parseInt(connTimeout));
+    }
+
+    this.httpClientPool = new GenericObjectPool<>(factory, config);
   }
 
   /**
@@ -123,19 +139,36 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
     String schemaUrl = KafkaAvroSchemaRegistry.this.url + GET_RESOURCE_BY_TYPE 
+ topic;
 
     LOG.debug("Fetching from URL : " + schemaUrl);
-
+    int retryInterval = 
Integer.parseInt(this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS,
 Integer.toString(5000)));
+    int retryTimes = 
Integer.parseInt(this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_RETRY_TIMES,
 Integer.toString(10)));
     GetMethod get = new GetMethod(schemaUrl);
 
-    int statusCode;
-    String schemaString;
+    int statusCode = -1;
+    String schemaString = "";
     HttpClient httpClient = this.borrowClient();
+    int loop = 0;
+
     try {
-      statusCode = httpClient.executeMethod(get);
-      schemaString = get.getResponseBodyAsString();
+      while (++loop <= retryTimes) {
+        try {
+          statusCode = httpClient.executeMethod(get);
+          schemaString = get.getResponseBodyAsString();
+          break;
+        } catch (Exception e) {
+          if (loop >= retryTimes) {
+            throw e;
+          } else {
+            log.error("Exception when fetching schema : {}", e.toString());
+            Thread.sleep(retryInterval);
+          }
+        }
+      }
     } catch (HttpException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {
       throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
     } finally {
       get.releaseConnection();
       this.httpClientPool.returnObject(httpClient);

Reply via email to