Repository: kafka
Updated Branches:
  refs/heads/trunk 859113786 -> 448f194c7


KAFKA-4532: StateStores can be connected to the wrong source topic resulting in 
incorrect metadata returned from Interactive Queries

When building a topology with tables and StateStores, the StateStores are 
mapped to the source topic names. This map is retrieved via 
TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive 
Queries to find the source topics and partitions when resolving the partitions 
that particular keys will be in.
There is an issue where by this mapping for a table that is originally created 
with builder.table("topic", "table");, and then is subsequently used in a join, 
is changed to the internal repartition topic. This is because the mapping is 
updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for 
the state store name it should not update the Map.

Author: Damian Guy <[email protected]>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2250 from dguy/kafka-4532


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/448f194c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/448f194c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/448f194c

Branch: refs/heads/trunk
Commit: 448f194c70e7a66ae2f1a7e89c822032359b14c9
Parents: 8591137
Author: Damian Guy <[email protected]>
Authored: Tue Dec 13 12:34:46 2016 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Tue Dec 13 12:34:46 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/TopologyBuilder.java |  7 +++++++
 .../streams/kstream/KStreamBuilderTest.java      | 19 +++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ecac8c9..74fea9c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -692,6 +692,13 @@ public class TopologyBuilder {
     private void connectStateStoreNameToSourceTopics(final String 
stateStoreName,
                                                      final 
ProcessorNodeFactory processorNodeFactory) {
 
+        // we should never update the mapping from state store names to source 
topics if the store name already exists
+        // in the map; this scenario is possible, for example, that a state 
store underlying a source KTable is
+        // connecting to a join operator whose source topic is not the 
original KTable's source topic but an internal repartition topic.
+        if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+            return;
+        }
+
         final Set<String> sourceTopics = 
findSourceTopicsForProcessorParents(processorNodeFactory.parents);
         if (sourceTopics.isEmpty()) {
             throw new TopologyBuilderException("can't find source topic for 
state store " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index b951743..52decf4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -150,4 +153,20 @@ public class KStreamBuilderTest {
         new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, 
null);
     }
 
+    @Test
+    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId("app-id");
+
+        final KStream<String, String> playEvents = builder.stream("events");
+
+        final KTable<String, String> table = builder.table("table-topic", 
"table-store");
+        assertEquals(Collections.singleton("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
+
+        final KStream<String, String> mapped = 
playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+        mapped.leftJoin(table, 
MockValueJoiner.STRING_JOINER).groupByKey().count("count");
+        assertEquals(Collections.singleton("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
+        
assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"),
 builder.stateStoreNameToSourceTopics().get("count"));
+    }
+
 }

Reply via email to