Repository: kafka Updated Branches: refs/heads/trunk 268cff704 -> 8f3462552
KAFKA-4104: Queryable state metadata is sometimes invalid If the thread or process is not the coordinator the Cluster instance in StreamPartitionAssignor will always be null. This builds an instance of the Cluster with the metadata associated with the Assignment Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1804 from dguy/kafka-4104 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f346255 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f346255 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f346255 Branch: refs/heads/trunk Commit: 8f3462552fa4d6a6d70a837c2ef7439bba512657 Parents: 268cff7 Author: Damian Guy <[email protected]> Authored: Thu Sep 1 21:21:42 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Sep 1 21:21:42 2016 -0700 ---------------------------------------------------------------------- .../internals/StreamPartitionAssignor.java | 18 +++++++++++ .../internals/StreamPartitionAssignorTest.java | 32 ++++++++++++++++++++ 2 files changed, 50 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index fd70a01..09e192d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -532,6 +532,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } this.partitionToTaskIds = partitionToTaskIds; this.partitionsByHostState = info.partitionsByHostState; + // only need to build when not coordinator + if (metadataWithInternalTopics == null) { + final Collection<Set<TopicPartition>> values = partitionsByHostState.values(); + final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>(); + for (Set<TopicPartition> value : values) { + for (TopicPartition topicPartition : value) { + topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), + topicPartition.partition(), + null, + new Node[0], + new Node[0])); + } + } + metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo); + } } public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() { @@ -542,6 +557,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } public Cluster clusterMetadata() { + if (metadataWithInternalTopics == null) { + return Cluster.empty(); + } return metadataWithInternalTopics; } http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 9d261bb..e300966 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -52,6 +52,7 @@ import java.util.Set; import java.util.UUID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class StreamPartitionAssignorTest { @@ -691,6 +692,37 @@ public class StreamPartitionAssignorTest { assertEquals(hostState, partitionAssignor.getPartitionsByHostState()); } + @Test + public void shouldSetClusterMetadataOnAssignment() throws Exception { + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + + final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0)); + final Map<HostInfo, Set<TopicPartition>> hostState = + Collections.singletonMap(new HostInfo("localhost", 80), + Collections.singleton(new TopicPartition("topic", 0))); + final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), + Collections.<TaskId, Set<TopicPartition>>emptyMap(), + hostState); + + + partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode())); + final Cluster cluster = partitionAssignor.clusterMetadata(); + final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic"); + final PartitionInfo partitionInfo = partitionInfos.get(0); + assertEquals(1, partitionInfos.size()); + assertEquals("topic", partitionInfo.topic()); + assertEquals(0, partitionInfo.partition()); + } + + @Test + public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + final Cluster cluster = partitionAssignor.clusterMetadata(); + assertNotNull(cluster); + + } + + private class MockInternalTopicManager extends InternalTopicManager { public Map<String, Integer> readyTopics = new HashMap<>();
