Repository: kafka
Updated Branches:
refs/heads/trunk 859113786 -> 448f194c7
KAFKA-4532: StateStores can be connected to the wrong source topic resulting in
incorrect metadata returned from Interactive Queries
When building a topology with tables and StateStores, the StateStores are
mapped to the source topic names. This map is retrieved via
TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive
Queries to find the source topics and partitions when resolving the partitions
that particular keys will be in.
There is an issue where by this mapping for a table that is originally created
with builder.table("topic", "table");, and then is subsequently used in a join,
is changed to the internal repartition topic. This is because the mapping is
updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for
the state store name it should not update the Map.
Author: Damian Guy <[email protected]>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #2250 from dguy/kafka-4532
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/448f194c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/448f194c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/448f194c
Branch: refs/heads/trunk
Commit: 448f194c70e7a66ae2f1a7e89c822032359b14c9
Parents: 8591137
Author: Damian Guy <[email protected]>
Authored: Tue Dec 13 12:34:46 2016 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Tue Dec 13 12:34:46 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/processor/TopologyBuilder.java | 7 +++++++
.../streams/kstream/KStreamBuilderTest.java | 19 +++++++++++++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ecac8c9..74fea9c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -692,6 +692,13 @@ public class TopologyBuilder {
private void connectStateStoreNameToSourceTopics(final String
stateStoreName,
final
ProcessorNodeFactory processorNodeFactory) {
+ // we should never update the mapping from state store names to source
topics if the store name already exists
+ // in the map; this scenario is possible, for example, that a state
store underlying a source KTable is
+ // connecting to a join operator whose source topic is not the
original KTable's source topic but an internal repartition topic.
+ if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+ return;
+ }
+
final Set<String> sourceTopics =
findSourceTopicsForProcessorParents(processorNodeFactory.parents);
if (sourceTopics.isEmpty()) {
throw new TopologyBuilderException("can't find source topic for
state store " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index b951743..52decf4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
import org.junit.After;
import org.junit.Test;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -150,4 +153,20 @@ public class KStreamBuilderTest {
new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null,
null);
}
+ @Test
+ public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId("app-id");
+
+ final KStream<String, String> playEvents = builder.stream("events");
+
+ final KTable<String, String> table = builder.table("table-topic",
"table-store");
+ assertEquals(Collections.singleton("table-topic"),
builder.stateStoreNameToSourceTopics().get("table-store"));
+
+ final KStream<String, String> mapped =
playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+ mapped.leftJoin(table,
MockValueJoiner.STRING_JOINER).groupByKey().count("count");
+ assertEquals(Collections.singleton("table-topic"),
builder.stateStoreNameToSourceTopics().get("table-store"));
+
assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
+ }
+
}