Updated Branches: refs/heads/master afc2fb7af -> 5709b27ff
SAMZA-116: Make BrokerProxy a concrete class. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5709b27f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5709b27f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5709b27f Branch: refs/heads/master Commit: 5709b27ffe86d928e291ea500a5c9afc14bb42e5 Parents: afc2fb7 Author: Steve Yates <syates at stevendyates dot com> Authored: Thu Feb 6 11:41:45 2014 -0800 Committer: Jakob Homan <[email protected]> Committed: Thu Feb 6 11:41:45 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 5 ++--- .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 4 +--- .../scala/org/apache/samza/system/kafka/TestBrokerProxy.scala | 6 ++---- 3 files changed, 5 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 5095e70..95a1fb5 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -44,12 +44,13 @@ object BrokerProxy { * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing * a way for consumers to retrieve those messages by topic and partition. */ -abstract class BrokerProxy( +class BrokerProxy( val host: String, val port: Int, val system: String, val clientID: String, val metrics: KafkaSystemConsumerMetrics, + val messageSink: MessageSink, val timeout: Int = ConsumerConfig.SocketTimeout, val bufferSize: Int = ConsumerConfig.SocketBufferSize, val fetchSize:Int = ConsumerConfig.FetchSize, @@ -57,8 +58,6 @@ abstract class BrokerProxy( val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging { - val messageSink: MessageSink - /** * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview */ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 56ec964..ed09ef5 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -118,9 +118,7 @@ private[kafka] class KafkaSystemConsumer( brokerOption match { case Some(broker) => - def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) { - val messageSink: MessageSink = sink - } + def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) brokerProxies .getOrElseUpdate((broker.host, broker.port), createBrokerProxy) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index bcd8998..d1df781 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -73,10 +73,9 @@ class TestBrokerProxy extends Logging { system, "daClientId", metrics, + sink, offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) { - val messageSink: MessageSink = sink - override val sleepMSWhileNoTopicPartitions = 100 // Speed up for test var alreadyCreatedConsumer = false // Scala traits and Mockito mocks don't mix, unfortunately. @@ -253,8 +252,7 @@ class TestBrokerProxy extends Logging { // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset - val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) { - val messageSink: MessageSink = mockMessageSink + val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) { override def createSimpleConsumer() = { if(callsToCreateSimpleConsumer > 1) {
