Repository: incubator-samza
Updated Branches:
  refs/heads/master 1c28f2eaf -> 39f9fd06e


SAMZA-209: Use timeout setting when fetching metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/39f9fd06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/39f9fd06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/39f9fd06

Branch: refs/heads/master
Commit: 39f9fd06eb9736802fab1f0fb4bc340a546d45b1
Parents: 1c28f2e
Author: Jakob Homan <[email protected]>
Authored: Thu Mar 27 09:10:50 2014 -0700
Committer: Jakob Homan <[email protected]>
Committed: Thu Mar 27 09:10:50 2014 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala     | 2 +-
 .../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala     | 2 +-
 .../scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index d45c1e4..cb6dbdf 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -71,7 +71,7 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
     val jobId = config.getJobId.getOrElse("1")
     val brokersListString = Option(producerConfig.brokerList)
       .getOrElse(throw new SamzaException("No broker list defined in config 
for %s." format systemName))
-    val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, 
clientId)
+    val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, 
clientId, socketTimeout)
     val checkpointTopic = getTopic(jobName, jobId)
     
     // This is a reasonably expensive operation and the TaskInstance already 
knows the answer. Should use that info.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 2a23652..0f72a79 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -193,7 +193,7 @@ class KafkaSystemAdmin(
    * don't hammer Kafka more than we need to.
    */
   protected def getTopicMetadata(topics: Set[String]) = {
-    new ClientUtilTopicMetadataStore(brokerListString, clientId)
+    new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
       .getTopicInfo(topics)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39f9fd06/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index e3f7553..511306f 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -119,7 +119,7 @@ private[kafka] class KafkaSystemConsumer(
     retryBackoff.run(
       loop => {
         val getTopicMetadata = (topics: Set[String]) => {
-          new ClientUtilTopicMetadataStore(brokerListString, 
clientId).getTopicInfo(topics)
+          new ClientUtilTopicMetadataStore(brokerListString, clientId, 
timeout).getTopicInfo(topics)
         }
         val topics = tpToRefresh.map(_.topic).toSet
         val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, 
systemName, getTopicMetadata)

Reply via email to