Repository: samza Updated Branches: refs/heads/master 40ffe4ea5 -> e8e67ab15
Fix SAMZA-1018. Check error code from metadata fetch in getSystemStreamPartitionCounts to avoid returning no data for newly created topics. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e8e67ab1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e8e67ab1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e8e67ab1 Branch: refs/heads/master Commit: e8e67ab156967eaaad1aed304fd26ef81efea127 Parents: 40ffe4e Author: Tommy Becker <[email protected]> Authored: Mon Sep 19 08:21:12 2016 -0400 Committer: Jacob Maes <[email protected]> Committed: Fri Sep 23 14:02:50 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaSystemAdmin.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e8e67ab1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index ba8de5c..5927cca 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -51,7 +51,7 @@ object KafkaSystemAdmin extends Logging { val streamPartitionMetadata = systemStreamPartitions .map(systemStreamPartition => { val partitionMetadata = new SystemStreamPartitionMetadata( - // If the topic/partition is empty then oldest and newest will + // If the topic/partition is empty then oldest and newest will // be stripped of their offsets, so default to null. oldestOffsets.getOrElse(systemStreamPartition, null), newestOffsets.getOrElse(systemStreamPartition, null), @@ -157,6 +157,7 @@ class KafkaSystemAdmin( metadataTTL) val result = metadata.map { case (topic, topicMetadata) => { + KafkaUtil.maybeThrowException(topicMetadata.errorCode) val partitionsMap = topicMetadata.partitionsMetadata.map { pm => new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "") @@ -183,8 +184,8 @@ class KafkaSystemAdmin( * SystemStreamPartition that was passed in. */ override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { - // This is safe to do with Kafka, even if a topic is key-deduped. If the - // offset doesn't exist on a compacted topic, Kafka will return the first + // This is safe to do with Kafka, even if a topic is key-deduped. If the + // offset doesn't exist on a compacted topic, Kafka will return the first // message AFTER the offset that was specified in the fetch request. offsets.mapValues(offset => (offset.toLong + 1).toString) } @@ -376,7 +377,7 @@ class KafkaSystemAdmin( private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = { val brokersToTopicPartitions = metadata .values - // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] + // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] .flatMap(topicMetadata => { KafkaUtil.maybeThrowException(topicMetadata.errorCode) topicMetadata
