This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 09fe51f KAFKA-6648; Fetcher.getTopicMetadata() should return all
partitions for each requested topic
09fe51f is described below
commit 09fe51f3eb7e3ddee54cfb210d8a22327d1b0773
Author: radai-rosenblatt <[email protected]>
AuthorDate: Fri Aug 3 10:38:46 2018 -0700
KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for
each requested topic
Currently Fetcher.getTopicMetadata() will not include offline partitions.
Thus
KafkaConsumer.partitionsFor(topic) will not return all partitions of a
topic if
there if any partition of the topic is offline. This causes problem if user
tries to query the total number of partitions of the given topic.
Author: radai-rosenblatt <[email protected]>
Reviewers: Jason Gustafson <[email protected]>, Rajini Sivaram
<[email protected]>
Closes #4679 from radai-rosenblatt/partition_shenanigans
---
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 46 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fd52cb6..dd412ab 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -313,7 +313,7 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
if (!shouldRetry) {
HashMap<String, List<PartitionInfo>> topicsPartitionInfos
= new HashMap<>();
for (String topic : cluster.topics())
- topicsPartitionInfos.put(topic,
cluster.availablePartitionsForTopic(topic));
+ topicsPartitionInfos.put(topic,
cluster.partitionsForTopic(topic));
return topicsPartitionInfos;
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4169550..f97c266 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -1388,6 +1389,51 @@ public class FetcherTest {
assertTrue(topicMetadata.containsKey(topicName));
}
+ @Test
+ public void testGetTopicMetadataOfflinePartitions() {
+ MetadataResponse originalResponse = newMetadataResponse(topicName,
Errors.NONE); //baseline ok response
+
+ //create a response based on the above one with all partitions being
leaderless
+ List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>();
+ for (MetadataResponse.TopicMetadata item :
originalResponse.topicMetadata()) {
+ List<MetadataResponse.PartitionMetadata> partitions =
item.partitionMetadata();
+ List<MetadataResponse.PartitionMetadata> altPartitions = new
ArrayList<>();
+ for (MetadataResponse.PartitionMetadata p : partitions) {
+ altPartitions.add(new MetadataResponse.PartitionMetadata(
+ p.error(),
+ p.partition(),
+ null, //no leader
+ p.replicas(),
+ p.isr(),
+ p.offlineReplicas())
+ );
+ }
+ MetadataResponse.TopicMetadata alteredTopic = new
MetadataResponse.TopicMetadata(
+ item.error(),
+ item.topic(),
+ item.isInternal(),
+ altPartitions
+ );
+ altTopics.add(alteredTopic);
+ }
+ Node controller = originalResponse.controller();
+ MetadataResponse altered = new MetadataResponse(
+ (List<Node>) originalResponse.brokers(),
+ originalResponse.clusterId(),
+ controller != null ? controller.id() :
MetadataResponse.NO_CONTROLLER_ID,
+ altTopics);
+
+ client.prepareResponse(altered);
+
+ Map<String, List<PartitionInfo>> topicMetadata =
+ fetcher.getTopicMetadata(new
MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L);
+
+ Assert.assertNotNull(topicMetadata);
+ Assert.assertNotNull(topicMetadata.get(topicName));
+ //noinspection ConstantConditions
+ Assert.assertEquals((int) cluster.partitionCountForTopic(topicName),
topicMetadata.get(topicName).size());
+ }
+
/*
* Send multiple requests. Verify that the client side quota metrics have
the right values
*/