Updated Branches:
  refs/heads/master afc2fb7af -> 5709b27ff

SAMZA-116: Make BrokerProxy a concrete class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5709b27f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5709b27f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5709b27f

Branch: refs/heads/master
Commit: 5709b27ffe86d928e291ea500a5c9afc14bb42e5
Parents: afc2fb7
Author: Steve Yates <syates at stevendyates dot com>
Authored: Thu Feb 6 11:41:45 2014 -0800
Committer: Jakob Homan <[email protected]>
Committed: Thu Feb 6 11:41:45 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 5 ++---
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala    | 4 +---
 .../scala/org/apache/samza/system/kafka/TestBrokerProxy.scala  | 6 ++----
 3 files changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 5095e70..95a1fb5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -44,12 +44,13 @@ object BrokerProxy {
  * A BrokerProxy consolidates Kafka fetches meant for a particular broker and 
retrieves them all at once, providing
  * a way for consumers to retrieve those messages by topic and partition.
  */
-abstract class BrokerProxy(
+class BrokerProxy(
   val host: String,
   val port: Int,
   val system: String,
   val clientID: String,
   val metrics: KafkaSystemConsumerMetrics,
+  val messageSink: MessageSink,
   val timeout: Int = ConsumerConfig.SocketTimeout,
   val bufferSize: Int = ConsumerConfig.SocketBufferSize,
   val fetchSize:Int = ConsumerConfig.FetchSize,
@@ -57,8 +58,6 @@ abstract class BrokerProxy(
   val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
   offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
 
-  val messageSink: MessageSink
-
   /**
    * How long should the fetcher thread sleep before checking if any 
TopicPartitions has been added to its purview
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 56ec964..ed09ef5 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -118,9 +118,7 @@ private[kafka] class KafkaSystemConsumer(
 
           brokerOption match {
             case Some(broker) =>
-              def createBrokerProxy = new BrokerProxy(broker.host, 
broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, 
consumerMinSize, consumerMaxWait, offsetGetter) {
-                val messageSink: MessageSink = sink
-              }
+              def createBrokerProxy = new BrokerProxy(broker.host, 
broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, 
fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
 
               brokerProxies
                 .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5709b27f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index bcd8998..d1df781 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -73,10 +73,9 @@ class TestBrokerProxy extends Logging {
       system,
       "daClientId",
       metrics,
+      sink,
       offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
 
-      val messageSink: MessageSink = sink
-
       override val sleepMSWhileNoTopicPartitions = 100 // Speed up for test
       var alreadyCreatedConsumer = false
       // Scala traits and Mockito mocks don't mix, unfortunately.
@@ -253,8 +252,7 @@ class TestBrokerProxy extends Logging {
 
     // So now we have a fetch response that will fail.  Prime the 
mockGetOffset to send us to a new offset
 
-    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, 
mockOffsetGetter) {
-      val messageSink: MessageSink = mockMessageSink
+    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, 256 * 1024, 524288, 
1000, mockOffsetGetter) {
 
       override def createSimpleConsumer() = {
         if(callsToCreateSimpleConsumer > 1) {

Reply via email to