This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09fe51f  KAFKA-6648; Fetcher.getTopicMetadata() should return all 
partitions for each requested topic
09fe51f is described below

commit 09fe51f3eb7e3ddee54cfb210d8a22327d1b0773
Author: radai-rosenblatt <[email protected]>
AuthorDate: Fri Aug 3 10:38:46 2018 -0700

    KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for 
each requested topic
    
    Currently Fetcher.getTopicMetadata() will not include offline partitions. 
Thus
    KafkaConsumer.partitionsFor(topic) will not return all partitions of a 
topic if
    there if any partition of the topic is offline. This causes problem if user
    tries to query the total number of partitions of the given topic.
    
    Author: radai-rosenblatt <[email protected]>
    
    Reviewers: Jason Gustafson <[email protected]>, Rajini Sivaram 
<[email protected]>
    
    Closes #4679 from radai-rosenblatt/partition_shenanigans
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  2 +-
 .../clients/consumer/internals/FetcherTest.java    | 46 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fd52cb6..dd412ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -313,7 +313,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                 if (!shouldRetry) {
                     HashMap<String, List<PartitionInfo>> topicsPartitionInfos 
= new HashMap<>();
                     for (String topic : cluster.topics())
-                        topicsPartitionInfos.put(topic, 
cluster.availablePartitionsForTopic(topic));
+                        topicsPartitionInfos.put(topic, 
cluster.partitionsForTopic(topic));
                     return topicsPartitionInfos;
                 }
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4169550..f97c266 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -1388,6 +1389,51 @@ public class FetcherTest {
         assertTrue(topicMetadata.containsKey(topicName));
     }
 
+    @Test
+    public void testGetTopicMetadataOfflinePartitions() {
+        MetadataResponse originalResponse = newMetadataResponse(topicName, 
Errors.NONE); //baseline ok response
+
+        //create a response based on the above one with all partitions being 
leaderless
+        List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>();
+        for (MetadataResponse.TopicMetadata item : 
originalResponse.topicMetadata()) {
+            List<MetadataResponse.PartitionMetadata> partitions = 
item.partitionMetadata();
+            List<MetadataResponse.PartitionMetadata> altPartitions = new 
ArrayList<>();
+            for (MetadataResponse.PartitionMetadata p : partitions) {
+                altPartitions.add(new MetadataResponse.PartitionMetadata(
+                    p.error(),
+                    p.partition(),
+                    null, //no leader
+                    p.replicas(),
+                    p.isr(),
+                    p.offlineReplicas())
+                );
+            }
+            MetadataResponse.TopicMetadata alteredTopic = new 
MetadataResponse.TopicMetadata(
+                item.error(),
+                item.topic(),
+                item.isInternal(),
+                altPartitions
+            );
+            altTopics.add(alteredTopic);
+        }
+        Node controller = originalResponse.controller();
+        MetadataResponse altered = new MetadataResponse(
+            (List<Node>) originalResponse.brokers(),
+            originalResponse.clusterId(),
+            controller != null ? controller.id() : 
MetadataResponse.NO_CONTROLLER_ID,
+            altTopics);
+
+        client.prepareResponse(altered);
+
+        Map<String, List<PartitionInfo>> topicMetadata =
+            fetcher.getTopicMetadata(new 
MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L);
+
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertNotNull(topicMetadata.get(topicName));
+        //noinspection ConstantConditions
+        Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), 
topicMetadata.get(topicName).size());
+    }
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have 
the right values
      */

Reply via email to