Repository: kafka
Updated Branches:
  refs/heads/0.8.2 f71933ef1 -> f88db16d1


kafka-1851; OffsetFetchRequest returns extra partitions when input only 
contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f88db16d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f88db16d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f88db16d

Branch: refs/heads/0.8.2
Commit: f88db16d15ff8a1883b7aed3c60eefa64faa764c
Parents: f71933e
Author: Jun Rao <jun...@gmail.com>
Authored: Fri Jan 9 11:31:47 2015 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Fri Jan 9 11:31:47 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala            | 6 +++++-
 .../src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 ++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f88db16d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2f00992..9a61fcb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -508,7 +508,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       metadataCache.getPartitionInfo(topicAndPartition.topic, 
topicAndPartition.partition).isEmpty
     )
     val unknownStatus = unknownTopicPartitions.map(topicAndPartition => 
(topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-    val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, 
knownTopicPartitions).toMap
+    val knownStatus =
+      if (knownTopicPartitions.size > 0)
+        offsetManager.getOffsets(offsetFetchRequest.groupId, 
knownTopicPartitions).toMap
+      else
+        Map.empty[TopicAndPartition, OffsetMetadataAndError]
     val status = unknownStatus ++ knownStatus
 
     val response = OffsetFetchResponse(status, 
offsetFetchRequest.correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88db16d/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 8c5364f..4a3a5b2 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // create the topic
     createTopic(zkClient, topic, partitionReplicaAssignment = 
expectedReplicaAssignment, servers = Seq(server))
 
-    val commitRequest = OffsetCommitRequest("test-group", 
immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
+    val commitRequest = OffsetCommitRequest(group, 
immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
     assertEquals(ErrorMapping.NoError, 
commitResponse.commitStatus.get(topicAndPartition).get)
@@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     assertEquals("some metadata", 
fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(100L, 
fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
 
+    // Fetch an unknown topic and verify
+    val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0)
+    val fetchRequest2 = OffsetFetchRequest(group, 
Seq(unknownTopicAndPartition))
+    val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2)
+
+    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, 
fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
+    assertEquals(1, fetchResponse2.requestInfo.size)
   }
 
   @Test

Reply via email to