http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
index aef7c58..6a3d749 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -61,7 +61,7 @@ public class KafkaRunQuery implements RunQuery {
      * @param kafkaHostname - The hostname of the Kafka Broker to connect to. 
(not null)
      * @param kafkaPort - The port of the Kafka Broker to connect to. (not 
null)
      * @param statementsTopic - The name of the topic that statements will be 
read from. (not null)
-     * @param resultsTopic - The name of the topic that query results will be 
writen to. (not null)
+     * @param resultsTopic - The name of the topic that query results will be 
written to. (not null)
      * @param queryRepo - The query repository that holds queries that are 
registered. (not null)
      * @param topologyFactory - Builds Kafka Stream processing topologies from 
SPARQL. (not null)
      */

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
index 124bc76..402061c 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
@@ -58,7 +58,7 @@ public class ProcessorResult {
      * @param unary - The unary result if that is this object's type. (not 
null)
      * @param binary - The binary result if that is this object's type. (not 
null)
      */
-    private  ProcessorResult(
+    private ProcessorResult(
             final ResultType type,
             final Optional<UnaryResult> unary,
             final Optional<BinaryResult> binary) {
@@ -235,7 +235,7 @@ public class ProcessorResult {
         }
 
         /**
-         * A label that is used to by the downstream binary prcoessor to 
distinguish which upstream processor
+         * A label that is used by the downstream binary processor to 
distinguish which upstream processor
          * produced the {@link BinaryResult}.
          */
         public static enum Side {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
index 367ca6f..4ea6959 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.rya.api.function.join.IterativeJoin;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.api.utils.UuidUtils;
 import org.apache.rya.streams.kafka.processors.ProcessorResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
@@ -138,13 +140,16 @@ public class JoinProcessorSupplier extends 
RyaStreamsProcessorSupplier {
 
         @Override
         public void init(final ProcessorContext context) {
-            // Hold onto the context so that we can foward results.
+            // Hold onto the context so that we can forward results.
             this.context = context;
 
+            final String appId = context.applicationId();
+            final UUID queryId = UuidUtils.extractUuidFromStringEnd(appId);
+
             // Get a reference to the state store that keeps track of what can 
be joined with.
             final KeyValueStore<String, VisibilityBindingSet> stateStore =
                     (KeyValueStore<String, VisibilityBindingSet>) 
context.getStateStore( stateStoreName );
-            joinStateStore = new KeyValueJoinStateStore( stateStore, joinVars, 
allVars );
+            joinStateStore = new KeyValueJoinStateStore( stateStore, 
queryId.toString(), joinVars, allVars );
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
index 254f226..8df77c4 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -77,7 +77,12 @@ public class KeyValueJoinStateStore implements 
JoinStateStore {
     /**
      * This is the maximum value of a UTF-8 character.
      */
-    private static final String END_RANGE_SUFFIX = new String(new byte[] { 
(byte) 0XFF }, Charsets.UTF_8);
+    private static final String END_RANGE_SUFFIX = new String(new byte[] { 
(byte) 0xFF }, Charsets.UTF_8);
+
+    /**
+     * Indicates where the end of the join variables occurs.
+     */
+    private static final String JOIN_VAR_END_MARKER = new 
String("~!^~".getBytes(Charsets.UTF_8), Charsets.UTF_8);
 
     /**
      * A default empty value that is stored for a start of range or end of 
range marker.
@@ -85,6 +90,7 @@ public class KeyValueJoinStateStore implements JoinStateStore 
{
     private static final VisibilityBindingSet RANGE_MARKER_VALUE = new 
VisibilityBindingSet(new MapBindingSet(), "");
 
     private final KeyValueStore<String, VisibilityBindingSet> store;
+    private final String id;
     private final List<String> joinVars;
     private final List<String> allVars;
 
@@ -92,15 +98,18 @@ public class KeyValueJoinStateStore implements 
JoinStateStore {
      * Constructs an instance of {@link KeyValueJoinStateStore}.
      *
      * @param store - The state store that will be used. (not null)
+     * @param id - The ID used for the state store. (not null)
      * @param joinVars - The variables that are used to build grouping keys. 
(not null)
      * @param allVars - The variables that are used to build full value keys. 
(not null)
      * @throws IllegalArgumentException Thrown if {@code allVars} does not 
start with {@code joinVars}.
      */
     public KeyValueJoinStateStore(
             final KeyValueStore<String, VisibilityBindingSet> store,
+            final String id,
             final List<String> joinVars,
             final List<String> allVars) throws IllegalArgumentException {
         this.store = requireNonNull(store);
+        this.id = requireNonNull(id);
         this.joinVars = requireNonNull(joinVars);
         this.allVars = requireNonNull(allVars);
 
@@ -120,16 +129,18 @@ public class KeyValueJoinStateStore implements 
JoinStateStore {
         // This is a prefix for every row that holds values for a specific set 
of join variable values.
         final Side side = result.getSide();
         final VisibilityBindingSet bs = result.getResult();
-        final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, 
bs);
+
+        final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, 
bs, joinVars.size());
 
         final List<KeyValue<String, VisibilityBindingSet>> values = new 
ArrayList<>();
 
-        // For each join variable set, we need a start key for scanning,
+        // For each join variable set, we need a start key for scanning.
         final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
         values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) );
 
         // The actual value that was emitted as a result.
-        final String valueKey = makeCommaDelimitedValues(side, allVars, bs);
+        final String valueKey = makeCommaDelimitedValues(side, allVars, bs, 
joinVars.size());
+
         values.add( new KeyValue<>(valueKey, bs) );
 
         // And the end key for scanning.
@@ -148,10 +159,11 @@ public class KeyValueJoinStateStore implements 
JoinStateStore {
         // Get an iterator over the values that start with the join variables 
for the other side.
         final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : 
Side.LEFT;
         final VisibilityBindingSet bs = result.getResult();
-        final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, 
joinVars, bs);
+        final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, 
joinVars, bs, joinVars.size());
 
         final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
         final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
+
         final KeyValueIterator<String, VisibilityBindingSet> rangeIt = 
store.range(startKey, endKey);
 
         // Return a CloseableIterator over the range's value fields, skipping 
the start and end entry.
@@ -241,21 +253,57 @@ public class KeyValueJoinStateStore implements 
JoinStateStore {
      * @param side - The side value for the key. (not null)
      * @param vars - Which variables within the binding set to use for the 
key's values. (not null)
      * @param bindingSet - The binding set the key is being constructed from. 
(not null)
+     * @param joinVarSize - the number of join variables at the beginning of
+     * {@code vars}.
      * @return A comma delimited list of the binding values, leading with the 
side.
      */
-    private static String makeCommaDelimitedValues(final Side side, final 
List<String> vars, final VisibilityBindingSet bindingSet) {
+    private String makeCommaDelimitedValues(final Side side, final 
List<String> vars, final VisibilityBindingSet bindingSet, final int 
joinVarSize) {
         requireNonNull(side);
         requireNonNull(vars);
         requireNonNull(bindingSet);
 
         // Make a an ordered list of the binding set variables.
         final List<String> values = new ArrayList<>();
+        values.add(id);
         values.add(side.toString());
+        int count = 0;
         for(final String var : vars) {
-            values.add( bindingSet.hasBinding(var) ? 
bindingSet.getBinding(var).getValue().toString() : "" );
+            count++;
+            String value = bindingSet.hasBinding(var) ? 
bindingSet.getBinding(var).getValue().toString() : "";
+            if (count == joinVarSize) {
+                // Place the marker at the end of the last joinVar String (and
+                // before the remaining "allVars")
+                // A marker is needed to indicate where the join vars end so
+                // that a range search from "urn:Student9[0x00]" to 
"urn:Student9[0xFF]"
+                // does not return "urn:Student95,[remainingBindingValues]".
+                value += JOIN_VAR_END_MARKER;
+            }
+            values.add(value);
         }
 
         // Return a comma delimited list of those values.
         return Joiner.on(",").join(values);
     }
+
+    private void printStateStoreRange(final String startKey, final String 
endKey) {
+        final KeyValueIterator<String, VisibilityBindingSet> rangeIt = 
store.range(startKey, endKey);
+        printStateStoreKeyValueIterator(rangeIt);
+    }
+
+    private void printStateStoreAll() {
+        final KeyValueIterator<String, VisibilityBindingSet> rangeIt = 
store.all();
+        printStateStoreKeyValueIterator(rangeIt);
+    }
+
+    private static void printStateStoreKeyValueIterator(final 
KeyValueIterator<String, VisibilityBindingSet> rangeIt) {
+        log.info("----------------");
+        while (rangeIt.hasNext()) {
+            final KeyValue<String, VisibilityBindingSet> keyValue = 
rangeIt.next();
+            log.info(keyValue.key + " :::: " + keyValue.value);
+        }
+        log.info("----------------\n\n");
+        if (rangeIt != null) {
+            rangeIt.close();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
index 666cbb0..c533854 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
@@ -57,8 +57,21 @@ public interface TopologyBuilderFactory {
     public static class TopologyBuilderException extends Exception {
         private static final long serialVersionUID = 1L;
 
+        /**
+         * Creates a new instance of {@link TopologyBuilderException}.
+         * @param message the detailed message.
+         * @param cause the {@link Throwable} cause.
+         */
         public TopologyBuilderException(final String message, final Throwable 
cause) {
             super(message, cause);
         }
+
+        /**
+         * Creates a new instance of {@link TopologyBuilderException}.
+         * @param message the detailed message.
+         */
+        public TopologyBuilderException(final String message) {
+            super(message);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
index 68fbb83..f330fa3 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -157,6 +157,10 @@ public class TopologyFactory implements 
TopologyBuilderFactory {
             }
         }
 
+        if (entry == null) {
+            throw new TopologyBuilderException("No valid processor entries 
found.");
+        }
+
         // Add a formatter that converts the ProcessorResults into the output 
format.
         final SinkEntry<?,?> sinkEntry = visitor.getSinkEntry();
         builder.addProcessor("OUTPUT_FORMATTER", 
sinkEntry.getFormatterSupplier(), entry.getID());
@@ -479,7 +483,7 @@ public class TopologyFactory implements 
TopologyBuilderFactory {
          * @return The {@link Side} the current node is on.
          */
         private Optional<Side> getSide(final QueryModelNode node) {
-            // if query parent is a binary operator, need to determine if its 
left or right.
+            // if query parent is a binary operator, need to determine if it's 
left or right.
             if (node.getParentNode() instanceof BinaryTupleOperator) {
                 final BinaryTupleOperator binary = (BinaryTupleOperator) 
node.getParentNode();
                 if (node.equals(binary.getLeftArg())) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3ae3aed..9cc67e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,6 +299,12 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.rya</groupId>
+                <artifactId>rya.api</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
                 <artifactId>rya.api.model</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git 
a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java 
b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
index fa1e0a0..f509770 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
@@ -49,7 +49,7 @@ public final class KafkaTestUtil {
     /**
      * Create a {@link Producer} that is able to write to a topic that is 
hosted within an embedded instance of Kafka.
      *
-     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa 
instance. (not null)
+     * @param kafka - The Kafka rule used to connect to the embedded Kafka 
instance. (not null)
      * @param keySerializerClass - Serializes the keys. (not null)
      * @param valueSerializerClass - Serializes the values. (not null)
      * @return A {@link Producer} that can be used to write records to a topic.
@@ -72,7 +72,7 @@ public final class KafkaTestUtil {
      * Create a {@link Consumer} that has a unique group ID and reads 
everything from a topic that is hosted within an
      * embedded instance of Kafka starting at the earliest point by default.
      *
-     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa 
instance. (not null)
+     * @param kafka - The Kafka rule used to connect to the embedded Kafka 
instance. (not null)
      * @param keyDeserializerClass - Deserializes the keys. (not null)
      * @param valueDeserializerClass - Deserializes the values. (not null)
      * @return A {@link Consumer} that can be used to read records from a 
topic.
@@ -95,14 +95,14 @@ public final class KafkaTestUtil {
     }
 
     /**
-     * Polls a {@link Consumer> until it has either polled too many times 
without hitting the target number
+     * Polls a {@link Consumer} until it has either polled too many times 
without hitting the target number
      * of results, or it hits the target number of results.
      *
      * @param pollMs - How long each poll could take.
      * @param pollIterations - The maximum number of polls that will be 
attempted.
      * @param targetSize - The number of results to read before stopping.
      * @param consumer - The consumer that will be polled.
-     * @return The results that were read frmo the consumer.
+     * @return The results that were read from the consumer.
      * @throws Exception If the poll failed.
      */
     public static <K, V> List<V> pollForResults(

Reply via email to