Repository: incubator-samza Updated Branches: refs/heads/master 1c28f2eaf -> 39f9fd06e
SAMZA-209: Use timeout setting when fetching metadata Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/39f9fd06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/39f9fd06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/39f9fd06 Branch: refs/heads/master Commit: 39f9fd06eb9736802fab1f0fb4bc340a546d45b1 Parents: 1c28f2e Author: Jakob Homan <[email protected]> Authored: Thu Mar 27 09:10:50 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Thu Mar 27 09:10:50 2014 -0700 ---------------------------------------------------------------------- .../samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala | 2 +- .../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala | 2 +- .../scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index d45c1e4..cb6dbdf 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -71,7 +71,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val jobId = config.getJobId.getOrElse("1") val brokersListString = Option(producerConfig.brokerList) .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) - val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId) + val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout) val checkpointTopic = getTopic(jobName, jobId) // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 2a23652..0f72a79 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -193,7 +193,7 @@ class KafkaSystemAdmin( * don't hammer Kafka more than we need to. */ protected def getTopicMetadata(topics: Set[String]) = { - new ClientUtilTopicMetadataStore(brokerListString, clientId) + new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) .getTopicInfo(topics) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index e3f7553..511306f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -119,7 +119,7 @@ private[kafka] class KafkaSystemConsumer( retryBackoff.run( loop => { val getTopicMetadata = (topics: Set[String]) => { - new ClientUtilTopicMetadataStore(brokerListString, clientId).getTopicInfo(topics) + new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout).getTopicInfo(topics) } val topics = tpToRefresh.map(_.topic).toSet val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, getTopicMetadata)
