Repository: kafka Updated Branches: refs/heads/trunk 1b902b4ed -> ca06862a7
KAFKA-2358: Cluster collection returning methods never return null See https://issues.apache.org/jira/browse/KAFKA-2358 Author: Stevo Slavic <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes #96 from sslavic/feature/KAFKA-2358 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca06862a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca06862a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca06862a Branch: refs/heads/trunk Commit: ca06862a7005ca476f900bd9c2373021422d695b Parents: 1b902b4 Author: Stevo Slavic <[email protected]> Authored: Thu Mar 2 13:12:59 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 2 13:12:59 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../org/apache/kafka/clients/producer/MockProducer.java | 2 +- clients/src/main/java/org/apache/kafka/common/Cluster.java | 9 ++++++--- .../test/java/org/apache/kafka/clients/MetadataTest.java | 2 +- .../kafka/streams/processor/DefaultPartitionGrouper.java | 4 ++-- .../processor/internals/StreamPartitionAssignor.java | 2 +- .../streams/processor/internals/StreamsMetadataState.java | 2 +- 8 files changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 65da330..87e5862 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -329,7 +329,7 @@ public final class Metadata { for (String topic : this.topics.keySet()) { List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); - if (partitionInfoList != null) { + if (!partitionInfoList.isEmpty()) { partitionInfos.addAll(partitionInfoList); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 63a39fa..1e37497 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1342,7 +1342,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { Cluster cluster = this.metadata.fetch(); List<PartitionInfo> parts = cluster.partitionsForTopic(topic); - if (parts != null) + if (!parts.isEmpty()) return parts; Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata( http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index bafb048..35f5d94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -114,7 +114,7 @@ public class MockProducer<K, V> implements Producer<K, V> { @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { int partition = 0; - if (this.cluster.partitionsForTopic(record.topic()) != null) + if (!this.cluster.partitionsForTopic(record.topic()).isEmpty()) partition = partition(record, this.cluster); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); ProduceRequestResult result = new ProduceRequestResult(topicPartition); http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/common/Cluster.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index b7408e3..ba1d2af 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -207,7 +207,8 @@ public final class Cluster { * @return A list of partitions */ public List<PartitionInfo> partitionsForTopic(String topic) { - return this.partitionsByTopic.get(topic); + List<PartitionInfo> parts = this.partitionsByTopic.get(topic); + return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts; } /** @@ -226,7 +227,8 @@ public final class Cluster { * @return A list of partitions */ public List<PartitionInfo> availablePartitionsForTopic(String topic) { - return this.availablePartitionsByTopic.get(topic); + List<PartitionInfo> parts = this.availablePartitionsByTopic.get(topic); + return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts; } /** @@ -235,7 +237,8 @@ public final class Cluster { * @return A list of partitions */ public List<PartitionInfo> partitionsForNode(int nodeId) { - return this.partitionsByNode.get(nodeId); + List<PartitionInfo> parts = this.partitionsByNode.get(nodeId); + return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 1a05abc..084ccd8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -412,7 +412,7 @@ public class MetadataTest { private Thread asyncFetch(final String topic, final long maxWaitMs) { Thread thread = new Thread() { public void run() { - while (metadata.fetch().partitionsForTopic(topic) == null) { + while (metadata.fetch().partitionsForTopic(topic).isEmpty()) { try { metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 5e4da4b..19e4809 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -62,7 +62,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper { for (String topic : topicGroup) { List<PartitionInfo> partitions = metadata.partitionsForTopic(topic); - if (partitions != null && partitionId < partitions.size()) { + if (partitionId < partitions.size()) { group.add(new TopicPartition(topic, partitionId)); } } @@ -81,7 +81,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper { for (String topic : topics) { List<PartitionInfo> partitions = metadata.partitionsForTopic(topic); - if (partitions == null) { + if (partitions.isEmpty()) { log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic); return StreamPartitionAssignor.NOT_AVAILABLE; } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 21b9109..e3f6698 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -406,7 +406,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } for (String topic : allSourceTopics) { List<PartitionInfo> partitionInfoList = metadataWithInternalTopics.partitionsForTopic(topic); - if (partitionInfoList != null) { + if (!partitionInfoList.isEmpty()) { for (PartitionInfo partitionInfo : partitionInfoList) { TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); if (!allAssignedPartitions.contains(partition)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 67a26bf..bb74b48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -279,7 +279,7 @@ public class StreamsMetadataState { this.sourceTopics = sourceTopics; for (String topic : sourceTopics) { final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic); - if (partitions != null && partitions.size() > maxPartitions) { + if (partitions.size() > maxPartitions) { maxPartitions = partitions.size(); topicWithMostPartitions = partitions.get(0).topic(); }
