Repository: kafka
Updated Branches:
  refs/heads/trunk 268cff704 -> 8f3462552


KAFKA-4104: Queryable state metadata is sometimes invalid

If the thread or process is not the coordinator the Cluster instance in 
StreamPartitionAssignor will always be null. This builds an instance of the 
Cluster with the metadata associated with the Assignment

Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1804 from dguy/kafka-4104


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f346255
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f346255
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f346255

Branch: refs/heads/trunk
Commit: 8f3462552fa4d6a6d70a837c2ef7439bba512657
Parents: 268cff7
Author: Damian Guy <[email protected]>
Authored: Thu Sep 1 21:21:42 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Thu Sep 1 21:21:42 2016 -0700

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 18 +++++++++++
 .../internals/StreamPartitionAssignorTest.java  | 32 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index fd70a01..09e192d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -532,6 +532,21 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
         this.partitionToTaskIds = partitionToTaskIds;
         this.partitionsByHostState = info.partitionsByHostState;
+        // only need to build when not coordinator
+        if (metadataWithInternalTopics == null) {
+            final Collection<Set<TopicPartition>> values = 
partitionsByHostState.values();
+            final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = 
new HashMap<>();
+            for (Set<TopicPartition> value : values) {
+                for (TopicPartition topicPartition : value) {
+                    topicToPartitionInfo.put(topicPartition, new 
PartitionInfo(topicPartition.topic(),
+                                                                               
topicPartition.partition(),
+                                                                               
null,
+                                                                               
new Node[0],
+                                                                               
new Node[0]));
+                }
+            }
+            metadataWithInternalTopics = 
Cluster.empty().withPartitions(topicToPartitionInfo);
+        }
     }
 
     public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
@@ -542,6 +557,9 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
     }
 
     public Cluster clusterMetadata() {
+        if (metadataWithInternalTopics == null) {
+            return Cluster.empty();
+        }
         return metadataWithInternalTopics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 9d261bb..e300966 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class StreamPartitionAssignorTest {
 
@@ -691,6 +692,37 @@ public class StreamPartitionAssignorTest {
         assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
     }
 
+    @Test
+    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
+
+        final List<TopicPartition> topic = Arrays.asList(new 
TopicPartition("topic", 0));
+        final Map<HostInfo, Set<TopicPartition>> hostState =
+                Collections.singletonMap(new HostInfo("localhost", 80),
+                                         Collections.singleton(new 
TopicPartition("topic", 0)));
+        final AssignmentInfo assignmentInfo = new 
AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
+                                                                 
Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+                                                                 hostState);
+
+
+        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, 
assignmentInfo.encode()));
+        final Cluster cluster = partitionAssignor.clusterMetadata();
+        final List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic("topic");
+        final PartitionInfo partitionInfo = partitionInfos.get(0);
+        assertEquals(1, partitionInfos.size());
+        assertEquals("topic", partitionInfo.topic());
+        assertEquals(0, partitionInfo.partition());
+    }
+
+    @Test
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws 
Exception {
+        final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
+        final Cluster cluster = partitionAssignor.clusterMetadata();
+        assertNotNull(cluster);
+
+    }
+
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();

Reply via email to