Updated Branches: refs/heads/master a0d938f51 -> ac7f32c8c
SAMZA-127; use proper brokerproxy when adding new topic partitions in kafka system consumer. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ac7f32c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ac7f32c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ac7f32c8 Branch: refs/heads/master Commit: ac7f32c8c69a493ed82ececf723ac569a8dadd2d Parents: a0d938f Author: Chris Riccomini <[email protected]> Authored: Thu Jan 9 16:16:38 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Jan 9 16:16:38 2014 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ac7f32c8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 577563e..56ec964 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -118,12 +118,13 @@ private[kafka] class KafkaSystemConsumer( brokerOption match { case Some(broker) => - val brokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) { + def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) { val messageSink: MessageSink = sink } - brokerProxies.getOrElseUpdate((broker.host, broker.port), brokerProxy) - brokerProxy.addTopicPartition(head, Option(lastOffset)) + brokerProxies + .getOrElseUpdate((broker.host, broker.port), createBrokerProxy) + .addTopicPartition(head, Option(lastOffset)) case None => warn("No such topic-partition: %s, dropping." format head) } rest
