Repository: incubator-samza
Updated Branches:
  refs/heads/master 429c1edb3 -> 87f19fcd0


SAMZA-203; fix changelog restore performance by increasing flushThreshold in 
kafka system consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/87f19fcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/87f19fcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/87f19fcd

Branch: refs/heads/master
Commit: 87f19fcd0ba38124fd231ac314d056180643931b
Parents: 429c1ed
Author: Chris Riccomini <[email protected]>
Authored: Tue Mar 25 09:19:14 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Tue Mar 25 09:19:14 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   |  2 +-
 .../apache/samza/system/kafka/BrokerProxy.scala | 70 +++++++++++---------
 .../system/kafka/KafkaSystemConsumer.scala      | 43 +++++++++---
 .../samza/system/kafka/KafkaSystemFactory.scala |  4 +-
 .../system/kafka/TestKafkaSystemConsumer.scala  | 19 ++++++
 5 files changed, 92 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 978620a..4deabd3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,7 +40,7 @@ object KafkaConfig {
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
    */
-  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + 
".samza.fetch.threshold"
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + 
"samza.fetch.threshold"
 
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index bca2f86..88817ef 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -38,7 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
  *  Companion object for class JvmMetrics encapsulating various constants
  */
 object BrokerProxy {
-  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY"
+  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
 }
 
 /**
@@ -62,7 +62,7 @@ class BrokerProxy(
   /**
    * How long should the fetcher thread sleep before checking if any 
TopicPartitions has been added to its purview
    */
-  val sleepMSWhileNoTopicPartitions = 1000
+  val sleepMSWhileNoTopicPartitions = 100
 
   /** What's the next offset for a particular partition? **/
   val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new 
ConcurrentHashMap[TopicAndPartition, Long]()
@@ -125,36 +125,42 @@ class BrokerProxy(
   val thread = new Thread(new Runnable {
     def run {
       var reconnect = false
-      (new ExponentialSleepStrategy).run(
-        loop => {
-          if (reconnect) {
-            metrics.reconnects(host, port).inc
-            simpleConsumer.close()
-            simpleConsumer = createSimpleConsumer()
-          }
-
-          while (!Thread.currentThread.isInterrupted) {
-            if (nextOffsets.size == 0) {
-              debug("No TopicPartitions to fetch. Sleeping.")
-              Thread.sleep(sleepMSWhileNoTopicPartitions)
-            } else {
-              fetchMessages
-
-              // If we got here, fetchMessages didn't throw an exception, i.e. 
it was successful.
-              // In that case, reset the loop delay, so that the next time an 
error occurs,
-              // we start with a short retry delay.
-              loop.reset
+
+      try {
+        (new ExponentialSleepStrategy).run(
+          loop => {
+            if (reconnect) {
+              metrics.reconnects(host, port).inc
+              simpleConsumer.close()
+              simpleConsumer = createSimpleConsumer()
+            }
+
+            while (!Thread.currentThread.isInterrupted) {
+              if (nextOffsets.size == 0) {
+                debug("No TopicPartitions to fetch. Sleeping.")
+                Thread.sleep(sleepMSWhileNoTopicPartitions)
+              } else {
+                fetchMessages
+
+                // If we got here, fetchMessages didn't throw an exception, 
i.e. it was successful.
+                // In that case, reset the loop delay, so that the next time 
an error occurs,
+                // we start with a short retry delay.
+                loop.reset
+              }
             }
-          }
-        },
-
-        (exception, loop) => {
-          warn("Restarting consumer due to %s. Turn on debugging to get a full 
stack trace." format exception)
-          debug(exception)
-          reconnect = true
-        }
-      )
-      if (Thread.currentThread.isInterrupted) info("Shutting down due to 
interrupt")
+          },
+
+          (exception, loop) => {
+            warn("Restarting consumer due to %s. Turn on debugging to get a 
full stack trace." format exception)
+            debug(exception)
+            reconnect = true
+          })
+      } catch {
+        case e: InterruptedException => info("Got interrupt exception in 
broker proxy thread.")
+        case e: ClosedByInterruptException => info("Got closed by interrupt 
exception in broker proxy thread.")
+      }
+
+      if (Thread.currentThread.isInterrupted) info("Shutting down due to 
interrupt.")
     }
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, 
clientID))
 
@@ -263,7 +269,7 @@ class BrokerProxy(
     info("Starting " + toString)
 
     thread.setDaemon(true)
-    
thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX)
+    thread.setName(SAMZA_THREAD_NAME_PREFIX + 
BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
     thread.start
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 8ad97df..e1ea2ff 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -34,6 +34,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.SamzaException
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -54,10 +55,24 @@ private[kafka] class KafkaSystemConsumer(
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
   timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  fetchSize:Int = ConsumerConfig.MaxFetchSize,
-  consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
-  consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
-  fetchThreshold: Int = 0,
+  fetchSize: Int = ConsumerConfig.MaxFetchSize,
+  consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
+  consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
+
+  /**
+   * Defines a low water mark for how many messages we buffer before we start
+   * executing fetch requests against brokers to get more messages. This value
+   * is divided equally among all registered SystemStreamPartitions. For
+   * example, if fetchThreshold is set to 50000, and there are 50
+   * SystemStreamPartitions registered, then the per-partition threshold is
+   * 1000. As soon as a SystemStreamPartition's buffered message count drops
+   * below 1000, a fetch request will be executed to get more data for it.
+   *
+   * Increasing this parameter will decrease the latency between when a queue
+   * is drained of messages and when new messages are enqueued, but also leads
+   * to an increase in memory usage since more messages will be held in memory.
+   */
+  fetchThreshold: Int = 50000,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new 
DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new 
DefaultDecoder().asInstanceOf[Decoder[Object]],
@@ -69,8 +84,15 @@ private[kafka] class KafkaSystemConsumer(
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
   var nextOffsets = Map[SystemStreamPartition, String]()
+  var perPartitionFetchThreshold = fetchThreshold
 
   def start() {
+    if (nextOffsets.size <= 0) {
+      throw new SamzaException("No SystemStreamPartitions registered. Must 
register at least one SystemStreamPartition before starting the consumer.")
+    }
+
+    perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
+
     val topicPartitionsAndOffsets = nextOffsets.map {
       case (systemStreamPartition, offset) =>
         val topicAndPartition = 
KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
@@ -106,16 +128,16 @@ private[kafka] class KafkaSystemConsumer(
 
         // addTopicPartition one at a time, leaving the to-be-done list intact 
in case of exceptions.
         // This avoids trying to re-add the same topic partition repeatedly
-        def refresh(tp:List[TopicAndPartition]) = {
+        def refresh(tp: List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
           val nextOffset = topicPartitionsAndOffsets.get(head).get
           // Whatever we do, we can't say Broker, even though we're
           // manipulating it here. Broker is a private type and Scala doesn't 
seem
           // to care about that as long as you don't explicitly declare its 
type.
           val brokerOption = partitionMetadata(head.topic)
-                             .partitionsMetadata
-                             .find(_.partitionId == head.partition)
-                             .flatMap(_.leader)
+            .partitionsMetadata
+            .find(_.partitionId == head.partition)
+            .flatMap(_.leader)
 
           brokerOption match {
             case Some(broker) =>
@@ -138,8 +160,7 @@ private[kafka] class KafkaSystemConsumer(
       (loop, exception) => {
         warn("While refreshing brokers for %s: %s. Retrying." format 
(tpToRefresh.head, exception))
         debug(exception)
-      }
-    )
+      })
   }
 
   val sink = new MessageSink {
@@ -148,7 +169,7 @@ private[kafka] class KafkaSystemConsumer(
     }
 
     def needsMoreMessages(tp: TopicAndPartition) = {
-      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
+      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= 
perPartitionFetchThreshold
     }
 
     def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index feecc58..d6e3a52 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -47,9 +47,9 @@ class KafkaSystemFactory extends SystemFactory {
     val consumerMaxWait = consumerConfig.fetchWaitMaxMs
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
-    val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
+    val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
-   
+
     new KafkaSystemConsumer(
       systemName = systemName,
       brokerListString = brokerListString,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
new file mode 100644
index 0000000..8bd51a1
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -0,0 +1,19 @@
+package org.apache.samza.system.kafka
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+
+class TestKafkaSystemConsumer {
+  @Test
+  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
+    val consumer = new KafkaSystemConsumer("", "", new 
KafkaSystemConsumerMetrics, fetchThreshold = 50000)
+
+    for (i <- 0 until 50) {
+      consumer.register(new SystemStreamPartition("test-system", 
"test-stream", new Partition(i)), "0")
+    }
+
+    assertEquals(1000, consumer.perPartitionFetchThreshold)
+  }
+}
\ No newline at end of file

Reply via email to