Repository: incubator-gobblin Updated Branches: refs/heads/master 5043c537c -> 760da9a3e
[GOBBLIN-356] Add socket and connection timeout for kafka schema registry Closes #2228 from yukuai518/schemaregistry Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/760da9a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/760da9a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/760da9a3 Branch: refs/heads/master Commit: 760da9a3e557f76fd84a65411c92cdb4615ca81f Parents: 5043c53 Author: Kuai Yu <[email protected]> Authored: Fri Jan 5 21:51:18 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Jan 5 21:51:18 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 8 ++++ .../kafka/schemareg/HttpClientFactory.java | 20 ++++++++- .../metrics/kafka/KafkaAvroSchemaRegistry.java | 45 +++++++++++++++++--- 3 files changed, 66 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 3e576ce..ed360d9 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -743,6 +743,14 @@ public class ConfigurationKeys { public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig"; /** + * Kafka schema registry + */ + public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT = "kafka.schema.registry.httpclient.so.timeout"; + public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT = "kafka.schema.registry.httpclient.conn.timeout"; + public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = "kafka.schema.registry.retry.times"; + public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS = "kafka.schema.registry.retry.interval.inMillis"; + + /** * Job execution info server and history store configuration properties. */ // If job execution info server is enabled http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java index 8c7b3a0..028dcf8 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java @@ -22,6 +22,8 @@ import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; +import lombok.Setter; + /** * An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}. @@ -30,9 +32,25 @@ import org.apache.commons.pool2.impl.DefaultPooledObject; */ public class HttpClientFactory extends BasePooledObjectFactory<HttpClient>{ + @Setter private int soTimeout = -1; + @Setter private int connTimeout = -1; + + public HttpClientFactory() { + } + @Override public HttpClient create() throws Exception { - return new HttpClient(); + + HttpClient client = new HttpClient(); + if (soTimeout >= 0) { + client.getParams().setSoTimeout(soTimeout); + } + + if (connTimeout >= 0) { + client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout); + } + + return client; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/760da9a3/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java index 6162636..2a216f2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java @@ -42,12 +42,15 @@ import org.apache.gobblin.kafka.schemareg.HttpClientFactory; import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.apache.gobblin.util.AvroUtils; +import lombok.extern.slf4j.Slf4j; + /** * An implementation of {@link KafkaSchemaRegistry}. * * @author Ziyang Liu */ +@Slf4j public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> { private static final Logger LOG = LoggerFactory.getLogger(KafkaAvroSchemaRegistry.class); @@ -85,7 +88,20 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(objPoolSize); config.setMaxIdle(objPoolSize); - this.httpClientPool = new GenericObjectPool<>(new HttpClientFactory(), config); + + HttpClientFactory factory = new HttpClientFactory(); + + if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT)) { + String soTimeout = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT); + factory.setSoTimeout(Integer.parseInt(soTimeout)); + } + + if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT)) { + String connTimeout = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT); + factory.setConnTimeout(Integer.parseInt(connTimeout)); + } + + this.httpClientPool = new GenericObjectPool<>(factory, config); } /** @@ -123,19 +139,36 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> String schemaUrl = KafkaAvroSchemaRegistry.this.url + GET_RESOURCE_BY_TYPE + topic; LOG.debug("Fetching from URL : " + schemaUrl); - + int retryInterval = Integer.parseInt(this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS, Integer.toString(5000))); + int retryTimes = Integer.parseInt(this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_RETRY_TIMES, Integer.toString(10))); GetMethod get = new GetMethod(schemaUrl); - int statusCode; - String schemaString; + int statusCode = -1; + String schemaString = ""; HttpClient httpClient = this.borrowClient(); + int loop = 0; + try { - statusCode = httpClient.executeMethod(get); - schemaString = get.getResponseBodyAsString(); + while (++loop <= retryTimes) { + try { + statusCode = httpClient.executeMethod(get); + schemaString = get.getResponseBodyAsString(); + break; + } catch (Exception e) { + if (loop >= retryTimes) { + throw e; + } else { + log.error("Exception when fetching schema : {}", e.toString()); + Thread.sleep(retryInterval); + } + } + } } catch (HttpException e) { throw new RuntimeException(e); } catch (IOException e) { throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } finally { get.releaseConnection(); this.httpClientPool.returnObject(httpClient);
