Repository: incubator-samza Updated Branches: refs/heads/master 12594fb71 -> 38e828832
SAMZA-169; retry on topic metadata refresh failure in kafka system admin. was ignoring, and losing metadata. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/38e82883 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/38e82883 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/38e82883 Branch: refs/heads/master Commit: 38e82883236de40a544d9f7a18cf49c7238dc258 Parents: 12594fb Author: Martin Kleppmann <[email protected]> Authored: Thu Mar 6 13:10:37 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Mar 6 13:10:37 2014 -0800 ---------------------------------------------------------------------- .../util/TestExponentialSleepStrategy.scala | 1 - .../samza/system/kafka/KafkaSystemAdmin.scala | 34 +++++++------ .../system/kafka/TestKafkaSystemAdmin.scala | 50 ++++++++++++++++++-- 3 files changed, 66 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala index 962ca40..3036da9 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -23,7 +23,6 @@ package org.apache.samza.util import org.junit.Assert._ import org.junit.Test -import org.apache.samza.util.ExponentialSleepStrategy class TestExponentialSleepStrategy { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index ad5f2fa..5325549 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -109,6 +109,9 @@ class KafkaSystemAdmin( import KafkaSystemAdmin._ + def getSystemStreamMetadata(streams: java.util.Set[String]) = + getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)) + /** * Given a set of stream names (topics), fetch metadata from Kafka for each * stream, and return a map from stream name to SystemStreamMetadata for @@ -116,14 +119,13 @@ class KafkaSystemAdmin( * if a given SystemStreamPartition is empty. This method will block and * retry indefinitely until it gets a successful response from Kafka. */ - def getSystemStreamMetadata(streams: java.util.Set[String]) = { + def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = { var partitions = Map[String, Set[Partition]]() var oldestOffsets = Map[SystemStreamPartition, String]() var newestOffsets = Map[SystemStreamPartition, String]() var upcomingOffsets = Map[SystemStreamPartition, String]() var done = false var consumer: SimpleConsumer = null - val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500) debug("Fetching offsets for: %s" format streams) @@ -189,7 +191,7 @@ class KafkaSystemAdmin( * Helper method to use topic metadata cache when fetching metadata, so we * don't hammer Kafka more than we need to. */ - private def getTopicMetadata(topics: Set[String]) = { + protected def getTopicMetadata(topics: Set[String]) = { new ClientUtilTopicMetadataStore(brokerListString, clientId) .getTopicInfo(topics) } @@ -202,17 +204,21 @@ class KafkaSystemAdmin( val brokersToTopicPartitions = metadata .values // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] - .flatMap(topicMetadata => topicMetadata - .partitionsMetadata - // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] - .map(partitionMetadata => { - ErrorMapping.maybeThrowException(partitionMetadata.errorCode) - val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId) - val leader = partitionMetadata - .leader - .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition)) - (leader, topicAndPartition) - })) + .flatMap(topicMetadata => { + ErrorMapping.maybeThrowException(topicMetadata.errorCode) + topicMetadata + .partitionsMetadata + // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] + .map(partitionMetadata => { + ErrorMapping.maybeThrowException(partitionMetadata.errorCode) + val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId) + val leader = partitionMetadata + .leader + .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition)) + (leader, topicAndPartition) + }) + }) + // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]] .groupBy(_._1) // Convert to a Map[Broker, Set[TopicAndPartition]] http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index aaf11d0..eaa9e53 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -46,7 +46,9 @@ import kafka.consumer.Consumer import kafka.consumer.ConsumerConfig import java.util.Properties import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.util.ExponentialSleepStrategy object TestKafkaSystemAdmin { val TOPIC = "input" @@ -205,10 +207,6 @@ class TestKafkaSystemAdmin { val systemName = "test" val systemAdmin = new KafkaSystemAdmin(systemName, brokers) - // Get a non-existent topic. - val initialInputOffsets = systemAdmin.getSystemStreamMetadata(Set("foo")) - assertEquals(0, initialInputOffsets.size) - // Create an empty topic with 50 partitions, but with no offsets. createTopic validateTopic(50) @@ -269,4 +267,48 @@ class TestKafkaSystemAdmin { assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString) assertEquals("val2", text) } + + @Test + def testNonExistentTopic { + val systemAdmin = new KafkaSystemAdmin("test", brokers) + val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic")) + val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata")) + assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( + new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0") + ))) + } + + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) { + import kafka.api.{TopicMetadata, TopicMetadataResponse} + + // Simulate Kafka telling us that the leader for the topic is not available + override def getTopicMetadata(topics: Set[String]) = { + val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode) + Map("quux" -> topicMetadata) + } + } + + class CallLimitReached extends Exception + + class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy { + var countCalls = 0 + + override def sleep() = { + if (countCalls >= maxCalls) throw new CallLimitReached + countCalls += 1 + } + } + + @Test + def testShouldRetryOnTopicMetadataError { + val systemAdmin = new KafkaSystemAdminWithTopicMetadataError + val retryBackoff = new MockSleepStrategy(maxCalls = 3) + try { + systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff) + } catch { + case e: CallLimitReached => () // this would be less ugly if we were using scalatest + case e: Throwable => throw e + } + assertEquals(retryBackoff.countCalls, 3) + } } \ No newline at end of file
