Repository: kafka
Updated Branches:
  refs/heads/trunk 4e0b0b83a -> 925310aac


KAFKA-4275: Check of State-Store-assignment to Processor-Nodes is not enabled

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #1992 from mjsax/kafka-4275-stateStoreCheck


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/925310aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/925310aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/925310aa

Branch: refs/heads/trunk
Commit: 925310aac0b6a0fb32e3e2d614198ffc78f34f96
Parents: 4e0b0b8
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Mon Oct 17 21:48:40 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Oct 17 21:48:40 2016 -0700

----------------------------------------------------------------------
 ...ractKTableKTableJoinValueGetterSupplier.java | 45 +++++++++++++
 .../kstream/internals/KStreamAggregate.java     |  4 ++
 .../streams/kstream/internals/KStreamImpl.java  |  5 +-
 .../kstream/internals/KStreamReduce.java        |  4 ++
 .../internals/KStreamWindowAggregate.java       |  6 +-
 .../kstream/internals/KStreamWindowReduce.java  |  4 ++
 .../kstream/internals/KTableAggregate.java      |  4 ++
 .../streams/kstream/internals/KTableFilter.java |  4 ++
 .../streams/kstream/internals/KTableImpl.java   |  8 ++-
 .../kstream/internals/KTableKTableJoin.java     | 15 +++--
 .../kstream/internals/KTableKTableLeftJoin.java | 16 +++--
 .../internals/KTableKTableOuterJoin.java        | 21 ++++---
 .../internals/KTableKTableRightJoin.java        | 15 +++--
 .../kstream/internals/KTableMapValues.java      |  4 ++
 .../streams/kstream/internals/KTableReduce.java |  4 ++
 .../kstream/internals/KTableRepartitionMap.java |  5 ++
 .../KTableSourceValueGetterSupplier.java        |  5 ++
 .../internals/KTableValueGetterSupplier.java    |  1 +
 .../internals/ProcessorContextImpl.java         | 10 +--
 .../streams/processor/TopologyBuilderTest.java  | 66 ++++++++++++++++++++
 20 files changed, 214 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
new file mode 100644
index 0000000..fa6d2aa
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.ArrayList;
+
+public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, 
V2> implements KTableValueGetterSupplier<K, R> {
+    final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+
+    public AbstractKTableKTableJoinValueGetterSupplier(final 
KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
+                                                       final 
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        this.valueGetterSupplier1 = valueGetterSupplier1;
+        this.valueGetterSupplier2 = valueGetterSupplier2;
+    }
+
+    @Override
+    public String[] storeNames() {
+        final String[] storeNames1 = valueGetterSupplier1.storeNames();
+        final String[] storeNames2 = valueGetterSupplier2.storeNames();
+        final ArrayList<String> stores = new ArrayList<>(storeNames1.length + 
storeNames2.length);
+        for (final String storeName : storeNames1) {
+            stores.add(storeName);
+        }
+        for (final String storeName : storeNames2) {
+            stores.add(storeName);
+        }
+        return stores.toArray(new String[stores.size()]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 428c513..d596d5e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -94,6 +94,10 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
                 return new KStreamAggregateValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bf345e1..b6c3401 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -584,6 +584,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         String name = topology.newName(LEFTJOIN_NAME);
 
         topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, 
?, V1>) other, joiner), this.name);
+        topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) 
other).valueGetterSupplier().storeNames());
         topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) 
other).name);
 
         return new KStreamImpl<>(topology, name, allSourceNodes, false);
@@ -703,8 +704,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             topology.addProcessor(joinThisName, joinThis, 
thisWindowStreamName);
             topology.addProcessor(joinOtherName, joinOther, 
otherWindowStreamName);
             topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
-            topology.addStateStore(thisWindow, thisWindowStreamName, 
otherWindowStreamName);
-            topology.addStateStore(otherWindow, thisWindowStreamName, 
otherWindowStreamName);
+            topology.addStateStore(thisWindow, thisWindowStreamName, 
joinOtherName);
+            topology.addStateStore(otherWindow, otherWindowStreamName, 
joinThisName);
 
             Set<String> allSourceNodes = new HashSet<>(((AbstractStream) 
lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 6d24284..1408169 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -93,6 +93,10 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
                 return new KStreamReduceValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 437d304..718e52b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -134,6 +134,10 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
                 return new KStreamWindowAggregateValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 2a47f72..0b93468 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -130,6 +130,10 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
                 return new KStreamWindowReduceValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 3f2ab97..2ef4709 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -105,6 +105,10 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
                 return new KTableAggregateValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index ff0c67f..059a36f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -52,6 +52,10 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
                 return new 
KTableFilterValueGetter(parentValueGetterSupplier.get());
             }
 
+            @Override
+            public String[] storeNames() {
+                return parentValueGetterSupplier.storeNames();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ebe00d8..c53e761 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.io.FileNotFoundException;
@@ -301,6 +301,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) 
other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
+        topology.connectProcessorAndStateStores(joinThisName, 
other.getStoreName());
+        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, 
allSourceNodes, null);
     }
@@ -327,6 +329,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) 
other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
+        topology.connectProcessorAndStateStores(joinThisName, 
other.getStoreName());
+        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, 
allSourceNodes, null);
     }
@@ -352,6 +356,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) 
other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
+        topology.connectProcessorAndStateStores(joinThisName, 
other.getStoreName());
+        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, 
allSourceNodes, null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 36424d1..cbd626d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -36,13 +36,18 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableValueGetterSupplier<K, R>() {
+        return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+    }
 
-            public KTableValueGetter<K, R> get() {
-                return new 
KTableKTableJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
-            }
+    private class KTableKTableJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
 
-        };
+        public 
KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+            super(valueGetterSupplier1, valueGetterSupplier2);
+        }
+
+        public KTableValueGetter<K, R> get() {
+            return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
+        }
     }
 
     private class KTableKTableJoinProcessor extends AbstractProcessor<K, 
Change<V1>> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 996ebc3..4bee38c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -36,15 +36,21 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableValueGetterSupplier<K, R>() {
+        return new 
KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+    }
 
-            public KTableValueGetter<K, R> get() {
-                return new 
KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
-            }
+    private class KTableKTableLeftJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+
+        public 
KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+            super(valueGetterSupplier1, valueGetterSupplier2);
+        }
 
-        };
+        public KTableValueGetter<K, R> get() {
+            return new 
KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
+        }
     }
 
+
     private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, 
Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 2a0d477..ad7dbde 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,13 +36,18 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableValueGetterSupplier<K, R>() {
+        return new 
KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+    }
 
-            public KTableValueGetter<K, R> get() {
-                return new 
KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
-            }
+    private class KTableKTableOuterJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
 
-        };
+        public 
KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+            super(valueGetterSupplier1, valueGetterSupplier2);
+        }
+
+        public KTableValueGetter<K, R> get() {
+            return new 
KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
+        }
     }
 
     private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, 
Change<V1>> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index fa41ed3..80aadaa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -37,13 +37,18 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableValueGetterSupplier<K, R>() {
+        return new 
KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+    }
 
-            public KTableValueGetter<K, R> get() {
-                return new 
KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
-            }
+    private class KTableKTableRightJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
 
-        };
+        public 
KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+            super(valueGetterSupplier1, valueGetterSupplier2);
+        }
+
+        public KTableValueGetter<K, R> get() {
+            return new 
KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
+        }
     }
 
     private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, 
Change<V1>> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 244d8ba..daabb00 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -50,6 +50,10 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
                 return new 
KTableMapValuesValueGetter(parentValueGetterSupplier.get());
             }
 
+            @Override
+            public String[] storeNames() {
+                return parentValueGetterSupplier.storeNames();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index a5457a5..8c2e5f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -103,6 +103,10 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
                 return new KTableAggregateValueGetter();
             }
 
+            @Override
+            public String[] storeNames() {
+                return new String[]{storeName};
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 939a1df..42bb13a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -53,6 +53,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements 
KTableProcessorSuppli
             public KTableValueGetter<K, KeyValue<K1, V1>> get() {
                 return new 
KTableMapValueGetter(parentValueGetterSupplier.get());
             }
+
+            @Override
+            public String[] storeNames() {
+                throw new StreamsException("Underlying state store not 
accessible due to repartitioning.");
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index fe41fa0..e59a3eb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -32,6 +32,11 @@ public class KTableSourceValueGetterSupplier<K, V> 
implements KTableValueGetterS
         return new KTableSourceValueGetter();
     }
 
+    @Override
+    public String[] storeNames() {
+        return new String[]{storeName};
+    }
+
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
 
         KeyValueStore<K, V> store = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
index 1ab6ba6..2423bf0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -21,4 +21,5 @@ public interface KTableValueGetterSupplier<K, V> {
 
     KTableValueGetter<K, V> get();
 
+    String[] storeNames();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f4d4e83..195e5a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,11 +18,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -133,9 +133,9 @@ public class ProcessorContextImpl implements 
InternalProcessorContext, RecordCol
         if (node == null)
             throw new TopologyBuilderException("Accessing from an unknown 
node");
 
-        // TODO: restore this once we fix the ValueGetter initialization issue
-        //if (!node.stateStores.contains(name))
-        //    throw new TopologyBuilderException("Processor " + node.name() + 
" has no access to StateStore " + name);
+        if (!node.stateStores.contains(name)) {
+            throw new TopologyBuilderException("Processor " + node.name() + " 
has no access to StateStore " + name);
+        }
 
         return stateMgr.getStore(name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7fe5170..3f45967 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -24,9 +26,11 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -517,4 +521,66 @@ public class TopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThroughOnUnassignedStateStoreAccess() {
+        final String sourceNodeName = "source";
+        final String goodNodeName = "goodGuy";
+        final String badNodeName = "badGuy";
+
+        final Properties config = new Properties();
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+        try {
+            final TopologyBuilder builder = new TopologyBuilder();
+            builder
+                .addSource(sourceNodeName, "topic")
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName)
+                .addStateStore(
+                    
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                    goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
+
+            final ProcessorTopologyTestDriver driver = new 
ProcessorTopologyTestDriver(streamsConfig, builder, 
LocalMockProcessorSupplier.STORE_NAME);
+            driver.process("topic", null, null);
+
+        } catch (final StreamsException e) {
+            final Throwable cause = e.getCause();
+            if (cause != null
+                && cause instanceof TopologyBuilderException
+                && cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
+                throw (TopologyBuilderException) cause;
+            } else {
+                throw new RuntimeException("Did expect different exception. 
Did catch:", e);
+            }
+        }
+    }
+
+    private static class LocalMockProcessorSupplier implements 
ProcessorSupplier {
+        final static String STORE_NAME = "store";
+
+        @Override
+        public Processor get() {
+            return new Processor() {
+                @Override
+                public void init(ProcessorContext context) {
+                    context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public void process(Object key, Object value) {
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                }
+
+                @Override
+                public void close() {
+                }
+            };
+        }
+    }
+
 }

Reply via email to