Repository: incubator-samza Updated Branches: refs/heads/master 10c6338a0 -> 4d0ad620b
SAMZA-213; update message chooser even when backing off polling of consumers Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4d0ad620 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4d0ad620 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4d0ad620 Branch: refs/heads/master Commit: 4d0ad620b7879ae6d051a34766a65cd8e5554b63 Parents: 10c6338 Author: Anh Thu Vu <[email protected]> Authored: Tue Apr 1 10:16:45 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Apr 1 10:16:45 2014 -0700 ---------------------------------------------------------------------- build.gradle | 2 +- .../apache/samza/system/SystemConsumers.scala | 28 +++++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 8e369b8..4775087 100644 --- a/build.gradle +++ b/build.gradle @@ -96,7 +96,7 @@ project(":samza-kafka_$scalaVersion") { test { // Bump up the heap so we can start ZooKeeper and Kafka brokers. - maxHeapSize = "1024m" + maxHeapSize = "2048m" } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index b715937..bbbacb5 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -197,18 +197,7 @@ class SystemConsumers( debug("Refreshing chooser with new messages.") // Poll every system for new messages. - val receivedNewMessages = consumers.keys.map(poll(_)).contains(true) - - // Update the chooser. - neededByChooser.foreach(systemStreamPartition => - // If we have messages for a stream that the chooser needs, then update. - if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) { - chooser.update(unprocessedMessages(systemStreamPartition).dequeue) - updateFetchMap(systemStreamPartition) - neededByChooser -= systemStreamPartition - }) - - receivedNewMessages + consumers.keys.map(poll(_)).contains(true) } } @@ -237,6 +226,7 @@ class SystemConsumers( } refresh.maybeCall() + updateMessageChooser envelopeFromChooser } @@ -303,4 +293,18 @@ class SystemConsumers( fetchMap += systemStreamPartition -> fetchSize systemFetchMapCache += systemName -> systemFetchMap } + + /** + * A helper method that updates MessageChooser. This should be called in + * "choose" method after we try to consume a message from MessageChooser. + */ + private def updateMessageChooser { + neededByChooser.foreach(systemStreamPartition => + // If we have messages for a stream that the chooser needs, then update. + if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) { + chooser.update(unprocessedMessages(systemStreamPartition).dequeue) + updateFetchMap(systemStreamPartition) + neededByChooser -= systemStreamPartition + }) + } }
