Updated Branches: refs/heads/master bf6a0eb37 -> 388b992fd
SAMZA-126; don't send empty fetch requests to kafka brokers. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/388b992f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/388b992f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/388b992f Branch: refs/heads/master Commit: 388b992fd358f52258afc04ac67e894db24add41 Parents: bf6a0eb Author: Chris Riccomini <[email protected]> Authored: Thu Jan 9 13:34:58 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Jan 9 13:34:58 2014 -0800 ---------------------------------------------------------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 26 ++++++++++++++------ .../kafka/KafkaSystemConsumerMetrics.scala | 2 ++ .../samza/system/kafka/TestBrokerProxy.scala | 12 +++++++++ 3 files changed, 32 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 124700e..5e3b7cb 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -131,17 +131,27 @@ abstract class BrokerProxy( }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) private def fetchMessages(): Unit = { - metrics.brokerReads(host, port).inc - val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*) - firstCall = false - firstCallBarrier.countDown() + val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList - // Split response into errors and non errors, processing the errors first - val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError) + if (topicAndPartitionsToFetch.size > 0) { + metrics.brokerReads(host, port).inc + val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*) + firstCall = false + firstCallBarrier.countDown() - handleErrors(errorResponses, response) + // Split response into errors and non errors, processing the errors first + val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError) - nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } + handleErrors(errorResponses, response) + + nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } + } else { + debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) + + metrics.brokerSkippedReads(host, port).inc + + Thread.sleep(sleepMSWhileNoTopicPartitions) + } } def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 1012d58..143be68 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -40,6 +40,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val reconnects = new ConcurrentHashMap[(String, Int), Counter] val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter] val brokerReads = new ConcurrentHashMap[(String, Int), Counter] + val brokerSkippedReads = new ConcurrentHashMap[(String, Int), Counter] val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]] def registerTopicAndPartition(tp: TopicAndPartition) = { @@ -55,6 +56,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port))) brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port))) brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port))) + brokerSkippedReads.put((host, port), newCounter("%s-%s-skipped-reads" format (host, port))) topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0)) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index e25cc4f..36445df 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -169,6 +169,18 @@ class TestBrokerProxy extends Logging { assertEquals(84, sink.receivedMessages.get(1)._2.offset) } + @Test def brokerProxySkipsFetchForEmptyRequests() = { + val (bp, tp, sink) = getMockBrokerProxy() + + bp.start + // Only add tp2, which should never receive messages since sink disables it. + bp.addTopicPartition(tp2, Option("0")) + Thread.sleep(1000) + assertEquals(0, sink.receivedMessages.size) + assertTrue(bp.metrics.brokerSkippedReads(bp.host, bp.port).getCount > 0) + assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0) + } + @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { val (bp, tp, _) = getMockBrokerProxy() bp.start
