Repository: kafka Updated Branches: refs/heads/0.10.1 f287efaff -> 9f8dc1301
KAFKA-4275: Check of State-Store-assignment to Processor-Nodes is not enabled Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #1992 from mjsax/kafka-4275-stateStoreCheck (cherry picked from commit 925310aac0b6a0fb32e3e2d614198ffc78f34f96) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f8dc130 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f8dc130 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f8dc130 Branch: refs/heads/0.10.1 Commit: 9f8dc1301d598d9686a2a4818fa4ab456bb935d6 Parents: f287efa Author: Matthias J. Sax <[email protected]> Authored: Mon Oct 17 21:48:40 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Oct 17 21:48:50 2016 -0700 ---------------------------------------------------------------------- ...ractKTableKTableJoinValueGetterSupplier.java | 45 +++++++++++++ .../kstream/internals/KStreamAggregate.java | 4 ++ .../streams/kstream/internals/KStreamImpl.java | 5 +- .../kstream/internals/KStreamReduce.java | 4 ++ .../internals/KStreamWindowAggregate.java | 6 +- .../kstream/internals/KStreamWindowReduce.java | 4 ++ .../kstream/internals/KTableAggregate.java | 4 ++ .../streams/kstream/internals/KTableFilter.java | 4 ++ .../streams/kstream/internals/KTableImpl.java | 8 ++- .../kstream/internals/KTableKTableJoin.java | 15 +++-- .../kstream/internals/KTableKTableLeftJoin.java | 16 +++-- .../internals/KTableKTableOuterJoin.java | 21 ++++--- .../internals/KTableKTableRightJoin.java | 15 +++-- .../kstream/internals/KTableMapValues.java | 4 ++ .../streams/kstream/internals/KTableReduce.java | 4 ++ .../kstream/internals/KTableRepartitionMap.java | 5 ++ .../KTableSourceValueGetterSupplier.java | 5 ++ .../internals/KTableValueGetterSupplier.java | 1 + .../internals/ProcessorContextImpl.java | 10 +-- .../streams/processor/TopologyBuilderTest.java | 66 ++++++++++++++++++++ 20 files changed, 214 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java new file mode 100644 index 0000000..fa6d2aa --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.util.ArrayList; + +public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> { + final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1; + final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2; + + public AbstractKTableKTableJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1, + final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + this.valueGetterSupplier1 = valueGetterSupplier1; + this.valueGetterSupplier2 = valueGetterSupplier2; + } + + @Override + public String[] storeNames() { + final String[] storeNames1 = valueGetterSupplier1.storeNames(); + final String[] storeNames2 = valueGetterSupplier2.storeNames(); + final ArrayList<String> stores = new ArrayList<>(storeNames1.length + storeNames2.length); + for (final String storeName : storeNames1) { + stores.add(storeName); + } + for (final String storeName : storeNames2) { + stores.add(storeName); + } + return stores.toArray(new String[stores.size()]); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 428c513..d596d5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -94,6 +94,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, return new KStreamAggregateValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index bf345e1..b6c3401 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -584,6 +584,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String name = topology.newName(LEFTJOIN_NAME); topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name); + topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames()); topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name); return new KStreamImpl<>(topology, name, allSourceNodes, false); @@ -703,8 +704,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V topology.addProcessor(joinThisName, joinThis, thisWindowStreamName); topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName); topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName); - topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName); + topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName); + topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName); Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes); allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes); http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index 6d24284..1408169 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -93,6 +93,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, return new KStreamReduceValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 437d304..718e52b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; @@ -134,6 +134,10 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea return new KStreamWindowAggregateValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 2a47f72..0b93468 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -130,6 +130,10 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr return new KStreamWindowReduceValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 3f2ab97..2ef4709 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -105,6 +105,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T return new KTableAggregateValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index ff0c67f..059a36f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -52,6 +52,10 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { return new KTableFilterValueGetter(parentValueGetterSupplier.get()); } + @Override + public String[] storeNames() { + return parentValueGetterSupplier.storeNames(); + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index ebe00d8..c53e761 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; @@ -32,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.io.FileNotFoundException; @@ -301,6 +301,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, topology.addProcessor(joinThisName, joinThis, this.name); topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); + topology.connectProcessorAndStateStores(joinThisName, other.getStoreName()); + topology.connectProcessorAndStateStores(joinOtherName, getStoreName()); return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); } @@ -327,6 +329,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, topology.addProcessor(joinThisName, joinThis, this.name); topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); + topology.connectProcessorAndStateStores(joinThisName, other.getStoreName()); + topology.connectProcessorAndStateStores(joinOtherName, getStoreName()); return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); } @@ -352,6 +356,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, topology.addProcessor(joinThisName, joinThis, this.name); topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); + topology.connectProcessorAndStateStores(joinThisName, other.getStoreName()); + topology.connectProcessorAndStateStores(joinOtherName, getStoreName()); return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index 36424d1..cbd626d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -36,13 +36,18 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableValueGetterSupplier<K, R>() { + return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + } - public KTableValueGetter<K, R> get() { - return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); - } + private class KTableKTableJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { - }; + public KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + super(valueGetterSupplier1, valueGetterSupplier2); + } + + public KTableValueGetter<K, R> get() { + return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); + } } private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> { http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 996ebc3..4bee38c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -36,15 +36,21 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableValueGetterSupplier<K, R>() { + return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + } - public KTableValueGetter<K, R> get() { - return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); - } + private class KTableKTableLeftJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { + + public KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + super(valueGetterSupplier1, valueGetterSupplier2); + } - }; + public KTableValueGetter<K, R> get() { + return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); + } } + private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> { private final KTableValueGetter<K, V2> valueGetter; http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index 2a0d477..ad7dbde 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -36,13 +36,18 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableValueGetterSupplier<K, R>() { + return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + } - public KTableValueGetter<K, R> get() { - return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); - } + private class KTableKTableOuterJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { - }; + public KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + super(valueGetterSupplier1, valueGetterSupplier2); + } + + public KTableValueGetter<K, R> get() { + return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); + } } private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> { http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index fa41ed3..80aadaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -37,13 +37,18 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableValueGetterSupplier<K, R>() { + return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + } - public KTableValueGetter<K, R> get() { - return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); - } + private class KTableKTableRightJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { - }; + public KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + super(valueGetterSupplier1, valueGetterSupplier2); + } + + public KTableValueGetter<K, R> get() { + return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); + } } private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> { http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index 244d8ba..daabb00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -50,6 +50,10 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { return new KTableMapValuesValueGetter(parentValueGetterSupplier.get()); } + @Override + public String[] storeNames() { + return parentValueGetterSupplier.storeNames(); + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index a5457a5..8c2e5f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -103,6 +103,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { return new KTableAggregateValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 939a1df..42bb13a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -53,6 +53,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli public KTableValueGetter<K, KeyValue<K1, V1>> get() { return new KTableMapValueGetter(parentValueGetterSupplier.get()); } + + @Override + public String[] storeNames() { + throw new StreamsException("Underlying state store not accessible due to repartitioning."); + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java index fe41fa0..e59a3eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java @@ -32,6 +32,11 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS return new KTableSourceValueGetter(); } + @Override + public String[] storeNames() { + return new String[]{storeName}; + } + private class KTableSourceValueGetter implements KTableValueGetter<K, V> { KeyValueStore<K, V> store = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java index 1ab6ba6..2423bf0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java @@ -21,4 +21,5 @@ public interface KTableValueGetterSupplier<K, V> { KTableValueGetter<K, V> get(); + String[] storeNames(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index f4d4e83..195e5a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -18,11 +18,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -133,9 +133,9 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol if (node == null) throw new TopologyBuilderException("Accessing from an unknown node"); - // TODO: restore this once we fix the ValueGetter initialization issue - //if (!node.stateStores.contains(name)) - // throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name); + if (!node.stateStores.contains(name)) { + throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name); + } return stateMgr.getStore(name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8dc130/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 7fe5170..3f45967 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; @@ -24,9 +26,11 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.ProcessorTopologyTestDriver; import org.junit.Test; import java.util.Arrays; @@ -517,4 +521,66 @@ public class TopologyBuilderTest { assertEquals(1, properties.size()); } + @Test(expected = TopologyBuilderException.class) + public void shouldThroughOnUnassignedStateStoreAccess() { + final String sourceNodeName = "source"; + final String goodNodeName = "goodGuy"; + final String badNodeName = "badGuy"; + + final Properties config = new Properties(); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + final StreamsConfig streamsConfig = new StreamsConfig(config); + + try { + final TopologyBuilder builder = new TopologyBuilder(); + builder + .addSource(sourceNodeName, "topic") + .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName) + .addStateStore( + Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), + goodNodeName) + .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); + + final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME); + driver.process("topic", null, null); + + } catch (final StreamsException e) { + final Throwable cause = e.getCause(); + if (cause != null + && cause instanceof TopologyBuilderException + && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { + throw (TopologyBuilderException) cause; + } else { + throw new RuntimeException("Did expect different exception. Did catch:", e); + } + } + } + + private static class LocalMockProcessorSupplier implements ProcessorSupplier { + final static String STORE_NAME = "store"; + + @Override + public Processor get() { + return new Processor() { + @Override + public void init(ProcessorContext context) { + context.getStateStore(STORE_NAME); + } + + @Override + public void process(Object key, Object value) { + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + } + }
