Updated Branches:
  refs/heads/master a0d938f51 -> ac7f32c8c

SAMZA-127; use proper brokerproxy when adding new topic partitions in kafka 
system consumer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ac7f32c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ac7f32c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ac7f32c8

Branch: refs/heads/master
Commit: ac7f32c8c69a493ed82ececf723ac569a8dadd2d
Parents: a0d938f
Author: Chris Riccomini <[email protected]>
Authored: Thu Jan 9 16:16:38 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Jan 9 16:16:38 2014 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ac7f32c8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 577563e..56ec964 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -118,12 +118,13 @@ private[kafka] class KafkaSystemConsumer(
 
           brokerOption match {
             case Some(broker) =>
-              val brokerProxy = new BrokerProxy(broker.host, broker.port, 
systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, 
consumerMaxWait, offsetGetter) {
+              def createBrokerProxy = new BrokerProxy(broker.host, 
broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, 
consumerMinSize, consumerMaxWait, offsetGetter) {
                 val messageSink: MessageSink = sink
               }
-              brokerProxies.getOrElseUpdate((broker.host, broker.port), 
brokerProxy)
 
-              brokerProxy.addTopicPartition(head, Option(lastOffset))
+              brokerProxies
+                .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)
+                .addTopicPartition(head, Option(lastOffset))
             case None => warn("No such topic-partition: %s, dropping." format 
head)
           }
           rest

Reply via email to