Updated Branches: refs/heads/master ba8ed30d5 -> 1df6792e9
SAMZA-21: Change KafkaSystemConsumer and BrokerProxy consumer defaults. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/1df6792e Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/1df6792e Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/1df6792e Branch: refs/heads/master Commit: 1df6792e92754c2480d5e0a6cf507db6eda70c63 Parents: ba8ed30 Author: Jakob Glen Homan <[email protected]> Authored: Wed Jan 8 13:36:06 2014 -0800 Committer: Jakob Glen Homan <[email protected]> Committed: Wed Jan 8 13:36:06 2014 -0800 ---------------------------------------------------------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 13 +++++----- .../kafka/DefaultFetchSimpleConsumer.scala | 12 ++++----- .../system/kafka/KafkaSystemConsumer.scala | 13 +++++++--- .../samza/system/kafka/KafkaSystemFactory.scala | 27 +++++--------------- .../samza/system/kafka/TestBrokerProxy.scala | 6 ++--- 5 files changed, 30 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 53b2e22..124700e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -30,6 +30,7 @@ import grizzled.slf4j.Logging import java.nio.channels.ClosedByInterruptException import java.util.Map.Entry import scala.collection.mutable +import kafka.consumer.ConsumerConfig /** * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing @@ -41,8 +42,11 @@ abstract class BrokerProxy( val system: String, val clientID: String, val metrics: KafkaSystemConsumerMetrics, - val timeout: Int = Int.MaxValue, - val bufferSize: Int = 1024000, + val timeout: Int = ConsumerConfig.SocketTimeout, + val bufferSize: Int = ConsumerConfig.SocketBufferSize, + val fetchSize:Int = ConsumerConfig.FetchSize, + val consumerMinSize:Int = ConsumerConfig.MinFetchBytes, + val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging { val messageSink: MessageSink @@ -70,10 +74,7 @@ abstract class BrokerProxy( val hostString = "%s:%d" format (host, port) info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system)) - val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID) { - val fetchSize: Int = 256 * 1024 - } - + val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) sc } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala index 80ea3ea..d90ca78 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala @@ -24,17 +24,15 @@ package org.apache.samza.system.kafka import kafka.consumer.SimpleConsumer import kafka.api._ import kafka.common.TopicAndPartition -import kafka.common.TopicAndPartition +import kafka.consumer.ConsumerConfig -abstract class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, clientId: scala.Predef.String) +class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, + clientId: scala.Predef.String, fetchSize: Int = ConsumerConfig.FetchSize, + minBytes:Int = ConsumerConfig.MinFetchBytes, maxWait:Int = ConsumerConfig.MaxFetchWaitMs) extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) { - val maxWait:Int = Int.MaxValue - val minBytes:Int = 1 - val fetchSize:Int - def defaultFetch(fetches:(TopicAndPartition, Long)*) = { - val fbr = new FetchRequestBuilder().maxWait(1000) + val fbr = new FetchRequestBuilder().maxWait(maxWait) .minBytes(minBytes) .clientId(clientId) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 33826d2..577563e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -32,6 +32,7 @@ import kafka.serializer.Decoder import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope +import kafka.consumer.ConsumerConfig object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -50,8 +51,11 @@ private[kafka] class KafkaSystemConsumer( brokerListString: String, metrics: KafkaSystemConsumerMetrics, clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, - timeout: Int = Int.MaxValue, - bufferSize: Int = 1024000, + timeout: Int = ConsumerConfig.ConsumerTimeoutMs, + bufferSize: Int = ConsumerConfig.SocketBufferSize, + fetchSize:Int = ConsumerConfig.MaxFetchSize, + consumerMinSize:Int = ConsumerConfig.MinFetchBytes, + consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, brokerMetadataFailureRefreshMs: Long = 10000, fetchThreshold: Int = 0, offsetGetter: GetOffset = new GetOffset("fail"), @@ -114,9 +118,10 @@ private[kafka] class KafkaSystemConsumer( brokerOption match { case Some(broker) => - val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) { + val brokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) { val messageSink: MessageSink = sink - }) + } + brokerProxies.getOrElseUpdate((broker.host, broker.port), brokerProxy) brokerProxy.addTopicPartition(head, Option(lastOffset)) case None => warn("No such topic-partition: %s, dropping." format head) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index a11a72a..b09ade2 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -22,20 +22,9 @@ package org.apache.samza.system.kafka import org.apache.samza.util.KafkaUtil import org.apache.samza.config.Config import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.config.KafkaConfig import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.util.ClientUtilTopicMetadataStore import org.apache.samza.SamzaException -import scala.collection.JavaConversions._ -import java.util.Properties import kafka.producer.Producer -import kafka.producer.async.DefaultEventHandler -import kafka.utils.Utils -import org.apache.samza.util.Util -import kafka.serializer.Decoder -import kafka.serializer.DefaultDecoder import org.apache.samza.system.SystemFactory class KafkaSystemFactory extends SystemFactory { @@ -52,19 +41,14 @@ class KafkaSystemFactory extends SystemFactory { // TODO could add stream-level overrides for timeout and buffer size val timeout = consumerConfig.socketTimeoutMs val bufferSize = consumerConfig.socketReceiveBufferBytes + val fetchSize = consumerConfig.fetchMessageMaxBytes + val consumerMinSize = consumerConfig.fetchMinBytes + val consumerMaxWait = consumerConfig.fetchWaitMaxMs val autoOffsetResetDefault = consumerConfig.autoOffsetReset val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - val deserializer = config.getConsumerMsgDeserializerClass(systemName) match { - case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass) - case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]] - } - val keyDeserializer = config.getConsumerKeyDeserializerClass(systemName) match { - case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass) - case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]] - } - + new KafkaSystemConsumer( systemName = systemName, brokerListString = brokerListString, @@ -72,6 +56,9 @@ class KafkaSystemFactory extends SystemFactory { clientId = clientId, timeout = timeout, bufferSize = bufferSize, + fetchSize = fetchSize, + consumerMinSize = consumerMinSize, + consumerMaxWait = consumerMaxWait, fetchThreshold = fetchThreshold, offsetGetter = offsetGetter) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/1df6792e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 9a3a29e..e25cc4f 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -87,9 +87,7 @@ class TestBrokerProxy extends Logging { } alreadyCreatedConsumer = true - new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b") { - val fetchSize: Int = 42 - + new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) { val sc = Mockito.mock(classOf[SimpleConsumer]) val mockOffsetResponse = { val offsetResponse = Mockito.mock(classOf[OffsetResponse]) @@ -243,7 +241,7 @@ class TestBrokerProxy extends Logging { // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset - val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, mockOffsetGetter) { + val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) { val messageSink: MessageSink = mockMessageSink override def createSimpleConsumer() = {
