Repository: kafka Updated Branches: refs/heads/trunk a93ef199b -> e52a6181b
kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e52a6181 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e52a6181 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e52a6181 Branch: refs/heads/trunk Commit: e52a6181bf0969f315ac0f0d325eac34d2b4a6ee Parents: a93ef19 Author: Jun Rao <jun...@gmail.com> Authored: Fri Jan 9 11:33:48 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jan 9 11:33:48 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++++- .../src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e52a6181/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2a1c032..c011a1b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -396,7 +396,11 @@ class KafkaApis(val requestChannel: RequestChannel, metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + val knownStatus = + if (knownTopicPartitions.size > 0) + offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + else + Map.empty[TopicAndPartition, OffsetMetadataAndError] val status = unknownStatus ++ knownStatus val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) http://git-wip-us.apache.org/repos/asf/kafka/blob/e52a6181/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..4a3a5b2 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) - val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) + val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset) + // Fetch an unknown topic and verify + val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0) + val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition)) + val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2) + + assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) + assertEquals(1, fetchResponse2.requestInfo.size) } @Test