Updated Branches:
  refs/heads/master ba8ed30d5 -> 1df6792e9

SAMZA-21: Change KafkaSystemConsumer and BrokerProxy consumer defaults.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/1df6792e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/1df6792e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/1df6792e

Branch: refs/heads/master
Commit: 1df6792e92754c2480d5e0a6cf507db6eda70c63
Parents: ba8ed30
Author: Jakob Glen Homan <[email protected]>
Authored: Wed Jan 8 13:36:06 2014 -0800
Committer: Jakob Glen Homan <[email protected]>
Committed: Wed Jan 8 13:36:06 2014 -0800

----------------------------------------------------------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 13 +++++-----
 .../kafka/DefaultFetchSimpleConsumer.scala      | 12 ++++-----
 .../system/kafka/KafkaSystemConsumer.scala      | 13 +++++++---
 .../samza/system/kafka/KafkaSystemFactory.scala | 27 +++++---------------
 .../samza/system/kafka/TestBrokerProxy.scala    |  6 ++---
 5 files changed, 30 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 53b2e22..124700e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -30,6 +30,7 @@ import grizzled.slf4j.Logging
 import java.nio.channels.ClosedByInterruptException
 import java.util.Map.Entry
 import scala.collection.mutable
+import kafka.consumer.ConsumerConfig
 
 /**
  * A BrokerProxy consolidates Kafka fetches meant for a particular broker and 
retrieves them all at once, providing
@@ -41,8 +42,11 @@ abstract class BrokerProxy(
   val system: String,
   val clientID: String,
   val metrics: KafkaSystemConsumerMetrics,
-  val timeout: Int = Int.MaxValue,
-  val bufferSize: Int = 1024000,
+  val timeout: Int = ConsumerConfig.SocketTimeout,
+  val bufferSize: Int = ConsumerConfig.SocketBufferSize,
+  val fetchSize:Int = ConsumerConfig.FetchSize,
+  val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
+  val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
   offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
 
   val messageSink: MessageSink
@@ -70,10 +74,7 @@ abstract class BrokerProxy(
     val hostString = "%s:%d" format (host, port)
     info("Creating new SimpleConsumer for host %s for system %s" format 
(hostString, system))
 
-    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, 
clientID) {
-      val fetchSize: Int = 256 * 1024
-    }
-
+    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, 
clientID, fetchSize, consumerMinSize, consumerMaxWait) 
     sc
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
index 80ea3ea..d90ca78 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
@@ -24,17 +24,15 @@ package org.apache.samza.system.kafka
 import kafka.consumer.SimpleConsumer
 import kafka.api._
 import kafka.common.TopicAndPartition
-import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
 
-abstract class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: 
scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, clientId: 
scala.Predef.String)
+class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, 
soTimeout: scala.Int, bufferSize: scala.Int,
+                                 clientId: scala.Predef.String, fetchSize: Int 
= ConsumerConfig.FetchSize,
+                                 minBytes:Int = ConsumerConfig.MinFetchBytes, 
maxWait:Int = ConsumerConfig.MaxFetchWaitMs)
   extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
 
-  val maxWait:Int = Int.MaxValue
-  val minBytes:Int = 1
-  val fetchSize:Int
-
   def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
-    val fbr = new FetchRequestBuilder().maxWait(1000)
+    val fbr = new FetchRequestBuilder().maxWait(maxWait)
       .minBytes(minBytes)
       .clientId(clientId)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 33826d2..577563e 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -32,6 +32,7 @@ import kafka.serializer.Decoder
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
+import kafka.consumer.ConsumerConfig
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -50,8 +51,11 @@ private[kafka] class KafkaSystemConsumer(
   brokerListString: String,
   metrics: KafkaSystemConsumerMetrics,
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
-  timeout: Int = Int.MaxValue,
-  bufferSize: Int = 1024000,
+  timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
+  bufferSize: Int = ConsumerConfig.SocketBufferSize,
+  fetchSize:Int = ConsumerConfig.MaxFetchSize,
+  consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
+  consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
   brokerMetadataFailureRefreshMs: Long = 10000,
   fetchThreshold: Int = 0,
   offsetGetter: GetOffset = new GetOffset("fail"),
@@ -114,9 +118,10 @@ private[kafka] class KafkaSystemConsumer(
 
           brokerOption match {
             case Some(broker) =>
-              val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, 
broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, 
metrics, timeout, bufferSize, offsetGetter) {
+              val brokerProxy = new BrokerProxy(broker.host, broker.port, 
systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, 
consumerMaxWait, offsetGetter) {
                 val messageSink: MessageSink = sink
-              })
+              }
+              brokerProxies.getOrElseUpdate((broker.host, broker.port), 
brokerProxy)
 
               brokerProxy.addTopicPartition(head, Option(lastOffset))
             case None => warn("No such topic-partition: %s, dropping." format 
head)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a11a72a..b09ade2 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,20 +22,9 @@ package org.apache.samza.system.kafka
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.KafkaConfig
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
-import java.util.Properties
 import kafka.producer.Producer
-import kafka.producer.async.DefaultEventHandler
-import kafka.utils.Utils
-import org.apache.samza.util.Util
-import kafka.serializer.Decoder
-import kafka.serializer.DefaultDecoder
 import org.apache.samza.system.SystemFactory
 
 class KafkaSystemFactory extends SystemFactory {
@@ -52,19 +41,14 @@ class KafkaSystemFactory extends SystemFactory {
     // TODO could add stream-level overrides for timeout and buffer size
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val fetchSize = consumerConfig.fetchMessageMaxBytes
+    val consumerMinSize = consumerConfig.fetchMinBytes
+    val consumerMaxWait = consumerConfig.fetchWaitMaxMs
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
     val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
-    val deserializer = config.getConsumerMsgDeserializerClass(systemName) 
match {
-      case Some(deserializerClass) => 
Util.getObj[Decoder[Object]](deserializerClass)
-      case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
-    }
-    val keyDeserializer = config.getConsumerKeyDeserializerClass(systemName) 
match {
-      case Some(deserializerClass) => 
Util.getObj[Decoder[Object]](deserializerClass)
-      case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
-    }
-
+   
     new KafkaSystemConsumer(
       systemName = systemName,
       brokerListString = brokerListString,
@@ -72,6 +56,9 @@ class KafkaSystemFactory extends SystemFactory {
       clientId = clientId,
       timeout = timeout,
       bufferSize = bufferSize,
+      fetchSize = fetchSize,
+      consumerMinSize = consumerMinSize,
+      consumerMaxWait = consumerMaxWait,
       fetchThreshold = fetchThreshold,
       offsetGetter = offsetGetter)
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 9a3a29e..e25cc4f 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -87,9 +87,7 @@ class TestBrokerProxy extends Logging {
         }
         alreadyCreatedConsumer = true
 
-        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b") {
-          val fetchSize: Int = 42
-
+        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) {
           val sc = Mockito.mock(classOf[SimpleConsumer])
           val mockOffsetResponse = {
             val offsetResponse = Mockito.mock(classOf[OffsetResponse])
@@ -243,7 +241,7 @@ class TestBrokerProxy extends Logging {
 
     // So now we have a fetch response that will fail.  Prime the 
mockGetOffset to send us to a new offset
 
-    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, Int.MaxValue, 1024000, mockOffsetGetter) {
+    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, 
mockOffsetGetter) {
       val messageSink: MessageSink = mockMessageSink
 
       override def createSimpleConsumer() = {

Reply via email to