RYA-377 Implement the JoinProcessor for KafkaStreams.

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3471cb7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3471cb7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3471cb7e

Branch: refs/heads/master
Commit: 3471cb7eedaadde5352c584cfa4135a4dbd66520
Parents: c4966ff
Author: kchilton2 <[email protected]>
Authored: Fri Nov 10 13:44:25 2017 -0500
Committer: caleb <[email protected]>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/kafka/pom.xml                |   5 +-
 .../StatementPatternProcessorSupplier.java      |   5 +
 .../processors/join/CloseableIterator.java      |  32 ++
 .../processors/join/JoinProcessorSupplier.java  | 191 +++++++
 .../kafka/processors/join/JoinStateStore.java   |  11 +-
 .../processors/join/KeyValueJoinStateStore.java | 259 ++++++++++
 .../apache/rya/streams/kafka/KafkaTestUtil.java |  82 +++
 .../processors/StatementPatternProcessorIT.java | 240 +++++++--
 .../kafka/processors/join/JoinProcessorIT.java  | 507 +++++++++++++++++++
 .../kafka/src/test/resources/log4j.properties   |  29 ++
 10 files changed, 1296 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 33cc985..d3f6891 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -57,11 +57,12 @@ under the License.
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
-       <dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-streams</artifactId>
         </dependency>
  
+        <!-- Misc. dependencies -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
@@ -79,4 +80,4 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
index 6991783..386fe98 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
@@ -30,6 +30,8 @@ import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.StatementPattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -70,6 +72,8 @@ public class StatementPatternProcessorSupplier implements 
ProcessorSupplier<Stri
     @DefaultAnnotation(NonNull.class)
     public static final class StatementPatternProcessor implements 
Processor<String, VisibilityStatement> {
 
+        private static final Logger log = 
LoggerFactory.getLogger(StatementPatternProcessor.class);
+
         private final StatementPatternMatcher spMatcher;
         private final ProcessorResultFactory resultFactory;
 
@@ -104,6 +108,7 @@ public class StatementPatternProcessorSupplier implements 
ProcessorSupplier<Stri
 
                 // Wrap the binding set as a result and forward it to the 
downstream processor.
                 final ProcessorResult resultValue = resultFactory.make(visBs);
+                log.debug("\nOUTPUT:\n{}", visBs);
                 context.forward(key, resultValue);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
new file mode 100644
index 0000000..9ea927d
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.Iterator;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An {@link Iterator} that is also {@link AutoCloseable}.
+ *
+ * @param <T> - The type of elements that will be iterated over.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
new file mode 100644
index 0000000..9ed2363
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link JoinProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+    private final String stateStoreName;
+    private final IterativeJoin join;
+    private final List<String> joinVars;
+    private final List<String> allVars;
+
+    /**
+     * Constructs an instance of {@link JoinProcessorSupplier}.
+     *
+     * @param stateStoreName - The name of the state store the processor will 
use. (not null)
+     * @param join - The join function the supplied processor will use. (not 
null)
+     * @param joinVars - The variables that the supplied processor will join 
over. (not null)
+     * @param allVars - An ordered list of all the variables that may appear 
in resulting Binding Sets.
+     *   This list must lead with the same variables and order as {@code 
joinVars}. (not null)
+     * @param resultFactory - The factory that the supplied processors will 
use to create results. (not null)
+     * @throws IllegalArgumentException Thrown if {@code allVars} does not 
start with {@code joinVars}.
+     */
+    public JoinProcessorSupplier(
+            final String stateStoreName,
+            final IterativeJoin join,
+            final List<String> joinVars,
+            final List<String> allVars,
+            final ProcessorResultFactory resultFactory) throws 
IllegalArgumentException {
+        super(resultFactory);
+        this.stateStoreName = requireNonNull(stateStoreName);
+        this.join = requireNonNull(join);
+        this.joinVars = requireNonNull(joinVars);
+        this.allVars = requireNonNull(allVars);
+
+        if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+            throw new IllegalArgumentException("All vars must be lead by the 
join vars, but it did not. " +
+                    "Join Vars: " + joinVars + ", All Vars: " + allVars);
+        }
+    }
+
+    @Override
+    public Processor<Object, ProcessorResult> get() {
+        return new JoinProcessor(stateStoreName, join, joinVars, allVars, 
super.getResultFactory());
+    }
+
+    /**
+     * Joins {@link VisibilityBindingSet}s against all binding sets that were 
emitted on the other side. This function
+     * does not have an age off policy, so it will match everything that could 
have ever possibly matched, however this
+     * may become prohibitive for joins that match a large volume of binding 
sets since this will indefinitely grow
+     * within the state store.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static class JoinProcessor extends RyaStreamsProcessor {
+
+        private static final Logger log = 
LoggerFactory.getLogger(JoinProcessor.class);
+
+        private final String stateStoreName;
+        private final IterativeJoin join;
+        private final List<String> joinVars;
+        private final List<String> allVars;
+        private final ProcessorResultFactory resultFactory;
+
+        private ProcessorContext context;
+        private JoinStateStore joinStateStore;
+
+        /**
+         * Constructs an instance of {@link JoinProcessor}.
+         *
+         * @param stateStoreName - The name of the state store the processor 
will use. (not null)
+         * @param join - The join function that the processor will use. (not 
null)
+         * @param joinVars - The variables that the processor will join over. 
(not null)
+         * @param allVars - An ordered list of all the variables that may 
appear in resulting Binding Sets.
+         *   This list must lead with the same variables and order as {@code 
joinVars}. (not null)
+         * @param resultFactory - The factory that will format this 
processor's final results
+         *   for the downstream processor. (not null)
+         */
+        public JoinProcessor(
+                final String stateStoreName,
+                final IterativeJoin join,
+                final List<String> joinVars,
+                final List<String> allVars,
+                final ProcessorResultFactory resultFactory) {
+            super(resultFactory);
+            this.stateStoreName = requireNonNull(stateStoreName);
+            this.join = requireNonNull(join);
+            this.joinVars = requireNonNull(joinVars);
+            this.allVars = requireNonNull(allVars);
+            this.resultFactory = requireNonNull(resultFactory);
+
+            if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+                throw new IllegalArgumentException("All vars must be lead by 
the join vars, but it did not. " +
+                        "Join Vars: " + joinVars + ", All Vars: " + allVars);
+            }
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            // Hold onto the context so that we can foward results.
+            this.context = context;
+
+            // Get a reference to the state store that keeps track of what can 
be joined with.
+            final KeyValueStore<String, VisibilityBindingSet> stateStore =
+                    (KeyValueStore<String, VisibilityBindingSet>) 
context.getStateStore( stateStoreName );
+            joinStateStore = new KeyValueJoinStateStore( stateStore, joinVars, 
allVars );
+        }
+
+        @Override
+        public void process(final Object key, final ProcessorResult value) {
+            // Log the key/value that have been encountered.
+            log.debug("\nINPUT:\nSide: {}\nBinding Set: {}", 
value.getBinary().getSide(), value.getBinary().getResult());
+
+            // Must be a binary result.
+            final BinaryResult binary = value.getBinary();
+
+            // Store the new result in the state store so that future joins 
may include it.
+            joinStateStore.store(binary);
+
+            // Fetch the binding sets that the emitted value joins with.
+            try(final CloseableIterator<VisibilityBindingSet> otherSide = 
joinStateStore.getJoinedValues(binary)) {
+                // Create an iterator that performs the join operation.
+                final Iterator<VisibilityBindingSet> joinResults = 
binary.getSide() == Side.LEFT ?
+                        join.newLeftResult(binary.getResult(), otherSide) :
+                        join.newRightResult(otherSide, binary.getResult());
+
+                // Format each join result and forward it to the downstream 
processor.
+                while(joinResults.hasNext()) {
+                    final VisibilityBindingSet joinResult = joinResults.next();
+                    final ProcessorResult resultValue = 
resultFactory.make(joinResult);
+                    log.debug("\nOUTPUT:\n{}", joinResult);
+                    context.forward(key, resultValue);
+                }
+            } catch (final Exception e) {
+                final String msg = "Problem encountered while iterating over 
the other side's values within the state store.";
+                log.error(msg, e);
+                throw new RuntimeException(msg, e);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Nothing to do.
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
index e53ec68..17a6ebb 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
@@ -18,9 +18,6 @@
  */
 package org.apache.rya.streams.kafka.processors.join;
 
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 
@@ -37,17 +34,15 @@ public interface JoinStateStore {
     /**
      * Store a {@link VisibilityBindingSet} based on the side it was emitted 
from.
      *
-     * @param joinVars - An ordered list of the variables that are being 
joined over. (not null)
      * @param result - The result whose value will be stored. (not null)
      */
-    public void store(List<String> joinVars, BinaryResult result);
+    public void store(BinaryResult result);
 
     /**
      * Get the previously stored {@link VisibilityBindingSet}s that join with 
the provided result.
      *
-     * @param joinVars - An ordered list of the variables to join over. (not 
null)
-     * @param result - Defines the values that will be used to join. (not null)
+     * @param result - The value that will be joined with. (not null)
      * @return The {@link VisibilityBinidngSet}s that join with {@code result}.
      */
-    public Iterator<VisibilityBindingSet> getJoinedValues(List<String> 
joinVars, BinaryResult result);
+    public CloseableIterator<VisibilityBindingSet> 
getJoinedValues(BinaryResult result);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
new file mode 100644
index 0000000..d73b40e
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link KeyValueStore} implementation of {@link JoinStateStore}.
+ * </p>
+ * This is a key/value store, so we need to store the {@link 
VisibilityBindingSet}s using keys that allow us to fetch
+ * all binding sets that join from a specific side. We use the following 
pattern to accomplish this:
+ * <pre>
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]
+ * </pre>
+ * This will group all binding sets that have been emitted from a specific 
side and who have the same join variables
+ * next to each other within the store. This isn't enough information to fetch 
that group, though. We must provide a
+ * start and end key to bound the range that is fetched back. To accomplish 
this, we place a start of range marker
+ * as the first key for all unique [side]/[join values] groups, and an end of 
range marker as the last key for each
+ * of those groups.
+ * </p>
+ * The rows follow this pattern:
+ * <pre>
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0x00
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN 
value],[remainingBindingValues]
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0xFF
+ * </pre>
+ * </p>
+ * When an iterator over the results is returned, it skips over the start and 
end of range markers.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KeyValueJoinStateStore implements JoinStateStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KeyValueJoinStateStore.class);
+
+    /**
+     * This is the minimum value in UTF-8 character.
+     */
+    private static final String START_RANGE_SUFFIX = new String(new byte[] { 
0x00 } );
+
+    /**
+     * This is the maximum value of a UTF-8 character.
+     */
+    private static final String END_RANGE_SUFFIX = new String(new byte[] { 
(byte) 0XFF } );
+
+    /**
+     * A default empty value that is stored for a start of range or end of 
range marker.
+     */
+    private static final VisibilityBindingSet RANGE_MARKER_VALUE = new 
VisibilityBindingSet(new MapBindingSet(), "");
+
+    private final KeyValueStore<String, VisibilityBindingSet> store;
+    private final List<String> joinVars;
+    private final List<String> allVars;
+
+    /**
+     * Constructs an instance of {@link KeyValueJoinStateStore}.
+     *
+     * @param store - The state store that will be used. (not null)
+     * @param joinVars - The variables that are used to build grouping keys. 
(not null)
+     * @param allVars - The variables that are used to build full value keys. 
(not null)
+     * @throws IllegalArgumentException Thrown if {@code allVars} does not 
start with {@code joinVars}.
+     */
+    public KeyValueJoinStateStore(
+            final KeyValueStore<String, VisibilityBindingSet> store,
+            final List<String> joinVars,
+            final List<String> allVars) throws IllegalArgumentException {
+        this.store = requireNonNull(store);
+        this.joinVars = requireNonNull(joinVars);
+        this.allVars = requireNonNull(allVars);
+
+        for(int i = 0; i < joinVars.size(); i++) {
+            if(!joinVars.get(i).equals(allVars.get(i))) {
+                throw new IllegalArgumentException("All vars must be lead by 
the join vars, but it did not. " +
+                        "Join Vars: " + joinVars + ", All Vars: " + allVars);
+            }
+        }
+    }
+
+    @Override
+    public void store(final BinaryResult result) {
+        requireNonNull(result);
+
+        // The join key prefix is an ordered list of values from the binding 
set that match the join variables.
+        // This is a prefix for every row that holds values for a specific set 
of join variable values.
+        final Side side = result.getSide();
+        final VisibilityBindingSet bs = result.getResult();
+        final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, 
bs);
+
+        final List<KeyValue<String, VisibilityBindingSet>> values = new 
ArrayList<>();
+
+        // For each join variable set, we need a start key for scanning,
+        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
+        values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) );
+
+        // The actual value that was emitted as a result.
+        final String valueKey = makeCommaDelimitedValues(side, allVars, bs);
+        values.add( new KeyValue<>(valueKey, bs) );
+
+        // And the end key for scanning.
+        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
+        values.add( new KeyValue<>(endKey, RANGE_MARKER_VALUE) );
+
+        // Write the pairs to the store.
+        log.debug("\nStoring the following values: {}\n", values);
+        store.putAll( values );
+    }
+
+    @Override
+    public CloseableIterator<VisibilityBindingSet> getJoinedValues(final 
BinaryResult result) {
+        requireNonNull(result);
+
+        // Get an iterator over the values that start with the join variables 
for the other side.
+        final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : 
Side.LEFT;
+        final VisibilityBindingSet bs = result.getResult();
+        final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, 
joinVars, bs);
+
+        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
+        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
+        final KeyValueIterator<String, VisibilityBindingSet> rangeIt = 
store.range(startKey, endKey);
+
+        // Return a CloseableIterator over the range's value fields, skipping 
the start and end entry.
+        return new CloseableIterator<VisibilityBindingSet>() {
+
+            private Optional<VisibilityBindingSet> next = null;
+
+            @Override
+            public boolean hasNext() {
+                // If the iterator has not been initialized yet, read a value 
in.
+                if(next == null) {
+                    next = readNext();
+                }
+
+                // Return true if there is a next value, otherwise false.
+                return next.isPresent();
+            }
+
+            @Override
+            public VisibilityBindingSet next() {
+                // If the iterator has not been initialized yet, read a value 
in.
+                if(next == null) {
+                    next = readNext();
+                }
+
+                // It's illegal to call next() when there is no next value.
+                if(!next.isPresent()) {
+                    throw new IllegalStateException("May not invoke next() 
when there is nothing left in the Iterator.");
+                }
+
+                // Update and return the next value.
+                final VisibilityBindingSet ret = next.get();
+                log.debug("\nReturning: {}", ret);
+                next = readNext();
+                return ret;
+            }
+
+            private Optional<VisibilityBindingSet> readNext() {
+                // Check to see if there's anything left in the iterator.
+                if(!rangeIt.hasNext()) {
+                    return Optional.empty();
+                }
+
+                // Read a candidate key/value pair from the iterator.
+                KeyValue<String, VisibilityBindingSet> candidate = 
rangeIt.next();
+
+                // If we are initializing, then the first thing we must read 
is a start of range marker.
+                if(next == null) {
+                    if(!candidate.key.endsWith(START_RANGE_SUFFIX)) {
+                        throw new IllegalStateException("The first key 
encountered must be a start of range key.");
+                    }
+                    log.debug("Read the start of range markers.\n");
+
+                    // Read a new candidate to skip this one.
+                    if(!rangeIt.hasNext()) {
+                        throw new IllegalStateException("There must be another 
entry after the start of range key.");
+                    }
+                    candidate = rangeIt.next();
+                }
+
+                // If that value is an end of range key, then we are finished. 
Otherwise, return it.
+                else if(candidate.key.endsWith(END_RANGE_SUFFIX)) {
+                    log.debug("Read the end of range marker.\n");
+
+                    // If there are more messages, that's a problem.
+                    if(rangeIt.hasNext()) {
+                        throw new IllegalStateException("The end of range 
marker must be the last key in the iterator.");
+                    }
+
+                    return Optional.empty();
+                }
+
+                // Otherwise we found a new value.
+                return Optional.of( candidate.value );
+            }
+
+            @Override
+            public void close() throws Exception {
+                rangeIt.close();
+            }
+        };
+    }
+
+    /**
+     * A utility function that helps construct the keys used by {@link 
KeyValueJoinStateStore}.
+     *
+     * @param side - The side value for the key. (not null)
+     * @param vars - Which variables within the binding set to use for the 
key's values. (not null)
+     * @param bindingSet - The binding set the key is being constructed from. 
(not null)
+     * @return A comma delimited list of the binding values, leading with the 
side.
+     */
+    private static String makeCommaDelimitedValues(final Side side, final 
List<String> vars, final VisibilityBindingSet bindingSet) {
+        requireNonNull(side);
+        requireNonNull(vars);
+        requireNonNull(bindingSet);
+
+        // Make a an ordered list of the binding set variables.
+        final List<String> values = new ArrayList<>();
+        values.add(side.toString());
+        for(final String var : vars) {
+            values.add( bindingSet.hasBinding(var) ? 
bindingSet.getBinding(var).getValue().toString() : "" );
+        }
+
+        // Return a comma delimited list of those values.
+        return Joiner.on(",").join(values);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
index bff4fdb..0a1a8a4 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
@@ -19,10 +19,13 @@
 package org.apache.rya.streams.kafka;
 
 import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kafka.clients.consumer.Consumer;
@@ -34,8 +37,20 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 
+import com.google.common.collect.Sets;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -124,4 +139,71 @@ public final class KafkaTestUtil {
 
         return values;
     }
+
+    /**
+     * Runs a Kafka Streams topology, loads statements into the input topic, 
read the binding sets that come out of
+     * the results topic, and ensures the expected results match the read 
results.
+     *
+     * @param kafka - The embedded kafka instance that is being tested with. 
(not null)
+     * @param statementsTopic - The topic statements will be written to. (not 
null)
+     * @param resultsTopic - The topic results will be read from. (not null)
+     * @param builder - The streams topology that will be executed. (not null)
+     * @param startupMs - How long to wait for the topology to start before 
writing the statements.
+     * @param statements - The statements that will be loaded into the topic. 
(not null)
+     * @param expected - The expected results. (not null)
+     * @throws Exception If any exception was thrown while running the test.
+     */
+    public static void runStreamProcessingTest(
+            final KafkaTestInstanceRule kafka,
+            final String statementsTopic,
+            final String resultsTopic,
+            final TopologyBuilder builder,
+            final int startupMs,
+            final List<VisibilityStatement> statements,
+            final Set<VisibilityBindingSet> expected) throws Exception {
+        requireNonNull(kafka);
+        requireNonNull(statementsTopic);
+        requireNonNull(resultsTopic);
+        requireNonNull(builder);
+        requireNonNull(statements);
+        requireNonNull(expected);
+
+        // Explicitly create the topics that are being used.
+        kafka.createTopic(statementsTopic);
+        kafka.createTopic(resultsTopic);
+
+        // Start the streams program.
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"StatementPatternProcessorIT");
+
+        final KafkaStreams streams = new KafkaStreams(builder, new 
StreamsConfig(props));
+        streams.cleanUp();
+        try {
+            streams.start();
+
+            // Wait for the streams application to start. Streams only see 
data after their consumers are connected.
+            Thread.sleep(startupMs);
+
+            // Load the statements into the input topic.
+            try(Producer<String, VisibilityStatement> producer = 
KafkaTestUtil.makeProducer(
+                    kafka, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
+                new KafkaLoadStatements(statementsTopic, 
producer).fromCollection(statements);
+            }
+
+            // Wait for the final results to appear in the output topic and 
verify the expected Binding Sets were found.
+            try(Consumer<String, VisibilityBindingSet> consumer = 
KafkaTestUtil.fromStartConsumer(
+                    kafka, StringDeserializer.class, 
VisibilityBindingSetDeserializer.class)) {
+                // Register the topic.
+                consumer.subscribe(Arrays.asList(resultsTopic));
+
+                // Poll for the result.
+                final Set<VisibilityBindingSet> results = Sets.newHashSet( 
KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+                // Show the correct binding sets results from the job.
+                assertEquals(expected, results);
+            }
+        } finally {
+            streams.close();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 1b58b42..371fd0b 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -18,34 +18,25 @@
  */
 package org.apache.rya.streams.kafka.processors;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import 
org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
 import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -63,7 +54,50 @@ public class StatementPatternProcessorIT {
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Test
-    public void statementPatternMatches() throws Exception {
+    public void singlePattern_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"SP1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create a statement that generate an SP result.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void singlePattern_manyStatements() throws Exception {
         // Enumerate some topics that will be re-used
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
@@ -88,48 +122,144 @@ public class StatementPatternProcessorIT {
         // Add a sink that writes the data out to a new Kafka topic.
         builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
 
-        // Start the streams program.
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"StatementPatternProcessorIT");
-
-        final KafkaStreams streams = new KafkaStreams(builder, new 
StreamsConfig(props));
-        streams.cleanUp();
-        try {
-            streams.start();
-
-            // Wait for the streams application to start. Streams only see 
data after their consumers are connected.
-            Thread.sleep(2000);
-
-            // Load some data into the input topic.
-            final ValueFactory vf = new ValueFactoryImpl();
-            final List<VisibilityStatement> statements = new ArrayList<>();
-            statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
-            try(Producer<String, VisibilityStatement> producer = 
KafkaTestUtil.makeProducer(
-                    kafka, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
-                new KafkaLoadStatements(statementsTopic, 
producer).fromCollection(statements);
-            }
-
-            // Wait for the final results to appear in the output topic and 
verify the expected Binding Set was found.
-            try(Consumer<String, VisibilityBindingSet> consumer = 
KafkaTestUtil.fromStartConsumer(
-                    kafka, StringDeserializer.class, 
VisibilityBindingSetDeserializer.class)) {
-                // Register the topic.
-                consumer.subscribe(Arrays.asList(resultsTopic));
-
-                // Poll for the result.
-                final List<VisibilityBindingSet> results = 
KafkaTestUtil.pollForResults(500, 6, 1, consumer);
-
-                // Show the correct binding set results from the job.
-                final QueryBindingSet bs = new QueryBindingSet();
-                bs.addBinding("person", vf.createURI("urn:Alice"));
-                bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-                final VisibilityBindingSet expected = new 
VisibilityBindingSet(bs, "a");
-
-                final VisibilityBindingSet result = results.iterator().next();
-                assertEquals(expected, result);
-            }
-        } finally {
-            streams.close();
-        }
+        // Create some statements where some generates SP results and others 
do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
+        expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void multiplePatterns_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { 
?person ?action <urn:Bob> }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"SP1", "SP2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements where some generates SP results and others 
do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void multiplePatterns_multipleStatements() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { 
?person ?action <urn:Bob> }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"SP1", "SP2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements where some generates SP results and others 
do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
+        expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Charlie"));
+        bs.addBinding("action", vf.createURI("urn:walksWith"));
+        expected.add( new VisibilityBindingSet(bs, "b") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
new file mode 100644
index 0000000..14a559f
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import 
org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
+import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import 
org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link JoinProcessor}.
+ */
+public class JoinProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badAllVars() throws IllegalArgumentException {
+        new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("person", "employee", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) ));
+    }
+
+    @Test
+    public void newLeftResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { 
?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new 
StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new 
StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), 
"LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with 
some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void newRightResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { 
?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new 
StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new 
StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), 
"LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with 
some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void newResultsBothSides() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { 
?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new 
StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new 
StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), 
"LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "a&c") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void manyJoins() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?employee }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { 
?employee <urn:worksAt> ?business }");
+        final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { 
?employee <urn:hourlyWage> ?wage }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natural join over SPs 1 and 2.
+        builder.addProcessor("JOIN1", new JoinProcessorSupplier(
+                "JOIN1",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "SP1", "SP2");
+
+        // Add a processor that handles the third statement pattern.
+        builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natural join over JOIN1 and SP3.
+        builder.addProcessor("JOIN2", new JoinProcessorSupplier(
+                "JOIN2",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "business", "wage"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), 
"JOIN1", "SP3");
+
+        // Setup the join state suppliers.
+        final StateStoreSupplier join1StoreSupplier =
+                Stores.create( "JOIN1" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(join1StoreSupplier, "JOIN1");
+
+        final StateStoreSupplier join2StoreSupplier =
+                Stores.create( "JOIN2" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(join2StoreSupplier, "JOIN2");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"JOIN2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        bs.addBinding("wage", vf.createLiteral(7.25));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 3000, statements, expected);
+    }
+
+    @Test
+    public void leftJoin() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE 
{ ?person <urn:talksTo> ?employee }");
+        final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE 
{ ?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("REQUIRED_SP", new 
StatementPatternProcessorSupplier(requiredSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("OPTIONAL_SP", new 
StatementPatternProcessorSupplier(optionalSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, 
result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier(
+                "LEFT_JOIN",
+                new LeftOuterJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), 
"REQUIRED_SP", "OPTIONAL_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "LEFT_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"LEFT_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a result that includes the 
optional value as well as one that does not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        expected.add( new VisibilityBindingSet(bs, "c") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/resources/log4j.properties 
b/extras/rya.streams/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..05e7be5
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
+
+log4j.logger.org.apache.rya.streams.kafka.processors=debug

Reply via email to