RYA-462 Updated the Kafka topic name for StreamsQueries to include the Rya 
Instance name.

Conflicts:
        
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e07390d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e07390d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e07390d3

Branch: refs/heads/master
Commit: e07390d38e17df13d49eb41a81f0391b92e1e27f
Parents: e453694
Author: kchilton2 <kevin.e.chil...@gmail.com>
Authored: Thu Feb 8 17:21:10 2018 -0500
Committer: Valiyil <puja.vali...@parsons.com>
Committed: Fri Mar 9 13:00:02 2018 -0500

----------------------------------------------------------------------
 .../api/interactor/GetQueryResultStream.java    |  6 +-
 .../streams/client/command/RunQueryCommand.java |  4 +-
 .../client/command/StreamResultsCommand.java    | 60 ++++----------------
 .../client/command/RunQueryCommandIT.java       |  2 +-
 .../kafka/processors/filter/GeoFilterIT.java    |  2 +-
 .../aggregation/AggregationProcessorIT.java     | 17 +++---
 .../processors/filter/FilterProcessorIT.java    |  2 +-
 .../processors/filter/TemporalFilterIT.java     |  8 +--
 .../kafka/processors/join/JoinProcessorIT.java  | 10 ++--
 .../projection/MultiProjectionProcessorIT.java  |  3 +-
 .../projection/ProjectionProcessorIT.java       |  3 +-
 .../sp/StatementPatternProcessorIT.java         |  9 ++-
 .../apache/rya/streams/kafka/KafkaTopics.java   | 28 ++++++++-
 .../kafka/SingleThreadKafkaStreamsFactory.java  |  2 +-
 .../interactor/KafkaGetQueryResultStream.java   | 12 ++--
 .../rya/streams/kafka/KafkaTopicsTest.java      |  4 +-
 .../interactor/KafkaGetQueryResultStreamIT.java | 18 +++---
 .../kafka/interactor/KafkaRunQueryIT.java       |  2 +-
 .../kafka/KafkaQueryChangeLogSource.java        |  4 +-
 .../querymanager/kafka/LocalQueryExecutor.java  |  2 +-
 .../kafka/LocalQueryExecutorIT.java             |  2 +-
 21 files changed, 93 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 951d060..0e70391 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -37,18 +37,20 @@ public interface GetQueryResultStream<T> {
     /**
      * Stream all of the results that have been produced by a query.
      *
+     * @param ryaInstance - The name of the Rya Instance the query is for. 
(not null)
      * @param queryId - Indicates which query results to stream. (not null)
      * @return A {@link QueryResultStream} that starts with the first result 
that was ever produced.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream<T> fromStart(UUID queryId) throws 
RyaStreamsException;
+    public QueryResultStream<T> fromStart(String ryaInstance, UUID queryId) 
throws RyaStreamsException;
 
     /**
      * Stream results that have been produced by a query after this method was 
invoked.
      *
+     * @param ryaInstance - The name of the Rya Instance the query is for. 
(not null)
      * @param queryId - Indicates which query results to stream. (not null)
      * @return A {@link QueryResultStream} that only returns results that were 
produced after this method is invoked.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream<T> fromNow(UUID queryId) throws 
RyaStreamsException;
+    public QueryResultStream<T> fromNow(String ryaInstance, UUID queryId) 
throws RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 7b311f6..f9a9458 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -135,7 +135,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
                 // Make sure the topics required by the application exists for 
the specified Rya instances.
                 final Set<String> topics = new HashSet<>();
                 topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
-                topics.add( KafkaTopics.queryResultsTopic(queryId) );
+                topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, 
queryId) );
                 KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 
1);
 
                 // Run the query that uses those topics.
@@ -143,7 +143,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
                         params.kafkaIP,
                         params.kafkaPort,
                         KafkaTopics.statementsTopic(params.ryaInstance),
-                        KafkaTopics.queryResultsTopic(queryId),
+                        KafkaTopics.queryResultsTopic(params.ryaInstance, 
queryId),
                         queryRepo,
                         new TopologyFactory());
                 runQuery.run(queryId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 8ae4e08..783aedc 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,18 +20,11 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.api.utils.QueryInvestigator;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -39,13 +32,13 @@ import 
org.apache.rya.streams.api.queries.InMemoryQueryRepository;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
-import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 
@@ -72,20 +65,14 @@ public class StreamResultsCommand implements 
RyaStreamsCommand {
         @Parameter(names = {"--queryId", "-q"}, required = true, description = 
"The query whose results will be streamed to the console.")
         private String queryId;
 
-        @Parameter(names = {"--file", "-f"}, required = false, description = 
"If provided, the output file the results will stream into.")
-        private String outputPath;
-
         @Override
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
 
             if (!Strings.isNullOrEmpty(queryId)) {
-                parameters.append("\tQuery ID: " + queryId + "\n");
-            }
-
-            if(!Strings.isNullOrEmpty(outputPath)) {
-                parameters.append("\tOutput Path: " + outputPath + "\n");
+                parameters.append("\tQuery ID: " + queryId);
+                parameters.append("\n");
             }
 
             return parameters.toString();
@@ -173,12 +160,10 @@ public class StreamResultsCommand implements 
RyaStreamsCommand {
         });
 
         // Build the interactor based on the type of result the query produces.
-        boolean isStatementResults = false;
-
         final GetQueryResultStream<?> getQueryResultStream;
         try {
-            isStatementResults = QueryInvestigator.isConstruct(sparql) | 
QueryInvestigator.isInsertWhere(sparql);
-            if(isStatementResults) {
+            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr();
+            if(tupleExpr instanceof Reduced) {
                 getQueryResultStream = new 
KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, 
VisibilityStatementDeserializer.class);
             } else {
                 getQueryResultStream = new 
KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, 
VisibilityBindingSetDeserializer.class);
@@ -187,22 +172,12 @@ public class StreamResultsCommand implements 
RyaStreamsCommand {
             throw new ExecutionException("Could not parse the SPARQL for the 
query: " + sparql, e);
         }
 
-        // Iterate through the results and print them to the configured output 
mechanism.
-        try (final QueryResultStream<?> resultsStream = 
getQueryResultStream.fromStart(queryId)) {
-            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr();
-            if(params.outputPath != null) {
-                final Path file = Paths.get(params.outputPath);
-                try (final OutputStream out = Files.newOutputStream(file)) {
-                    if(isStatementResults) {
-                        final QueryResultStream<VisibilityStatement> 
stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
-                        QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, 
finished);
-                    } else {
-                        final QueryResultStream<VisibilityBindingSet> bsStream 
= (QueryResultStream<VisibilityBindingSet>) resultsStream;
-                        QueryResultsOutputUtil.toBindingSetJSONFile(out, 
tupleExpr, bsStream, finished);
-                    }
+        // Iterate through the results and print them to the console until the 
program or the stream ends.
+        try (final QueryResultStream<?> stream = 
getQueryResultStream.fromStart(params.ryaInstance, queryId)) {
+            while(!finished.get()) {
+                for(final Object result : stream.poll(1000)) {
+                    System.out.println(result);
                 }
-            } else {
-                streamToSystemOut(resultsStream, finished);
             }
         } catch (final Exception e) {
             System.err.println("Error while reading the results from the 
stream.");
@@ -210,15 +185,4 @@ public class StreamResultsCommand implements 
RyaStreamsCommand {
             System.exit(1);
         }
     }
-
-    private static void streamToSystemOut(final QueryResultStream<?> stream, 
final AtomicBoolean shutdownSignal) throws Exception {
-        requireNonNull(stream);
-        requireNonNull(shutdownSignal);
-
-        while(!shutdownSignal.get()) {
-            for(final Object result : stream.poll(1000)) {
-                System.out.println(result);
-            }
-        }
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 5d63f32..176b920 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -184,7 +184,7 @@ public class RunQueryCommandIT {
             loadStatements.fromCollection(statements);
 
             // Read the output of the streams program.
-            final String resultsTopic = 
KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            final String resultsTopic = 
KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
             resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
             results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
 
b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index c090afa..17a290a 100644
--- 
a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ 
b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -88,7 +88,7 @@ public class GeoFilterIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index a4e4a3e..2a1e760 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -34,7 +34,6 @@ import 
org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorS
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
@@ -90,7 +89,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -137,7 +136,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -183,7 +182,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -233,7 +232,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -283,7 +282,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -371,7 +370,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -380,9 +379,7 @@ public class AggregationProcessorIT {
         RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
     }
 
-    // Ignored because this test is kind of flakey.
     @Test
-    @Ignore
     public void multipleAggregations() throws Exception {
         // A query that figures out what the youngest and oldest ages are 
across all people.
         final String sparql =
@@ -436,7 +433,7 @@ public class AggregationProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index aaa67ea..7fb228a 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -54,7 +54,7 @@ public class FilterProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index 6e27669..11637b7 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -85,7 +85,7 @@ public class TemporalFilterIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -119,7 +119,7 @@ public class TemporalFilterIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -153,7 +153,7 @@ public class TemporalFilterIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -187,7 +187,7 @@ public class TemporalFilterIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index bdb9be6..5f09372 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -69,7 +69,7 @@ public class JoinProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query =
@@ -120,7 +120,7 @@ public class JoinProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query =
@@ -171,7 +171,7 @@ public class JoinProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query =
@@ -228,7 +228,7 @@ public class JoinProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query =
@@ -269,7 +269,7 @@ public class JoinProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index a560294..c6fd1cf 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
 import 
org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -54,7 +53,7 @@ public class MultiProjectionProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Create a topology for the Query that will be tested.
         final String sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 322bba9..f53f2c4 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -57,7 +56,7 @@ public class ProjectionProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Create a topology for the Query that will be tested.
         final String sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
index ba11e57..fd0a48d 100644
--- 
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
+++ 
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -55,7 +54,7 @@ public class StatementPatternProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> 
?otherPerson }";
@@ -85,7 +84,7 @@ public class StatementPatternProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> 
?otherPerson }";
@@ -123,7 +122,7 @@ public class StatementPatternProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { "
@@ -157,7 +156,7 @@ public class StatementPatternProcessorIT {
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { "

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 989799a..f0cc842 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -62,7 +62,7 @@ public class KafkaTopics {
      * @param changeLogTopic - The topic to evaluate. (not null)
      * @return If the topic is well formatted, then the Rya instance name that 
was part of the topic name.
      */
-    public static Optional<String> getRyaInstance(final String changeLogTopic) 
{
+    public static Optional<String> getRyaInstanceFromQueryChangeLog(final 
String changeLogTopic) {
         requireNonNull(changeLogTopic);
 
         // Return absent if the provided topic does not represent a query 
change log topic.
@@ -93,9 +93,31 @@ public class KafkaTopics {
      * @param queryId - The id of the query the topic is for.
      * @return The name of the Kafka topic.
      */
-    public static String queryResultsTopic(final UUID queryId) {
+    public static String queryResultsTopic(final String ryaInstance, final 
UUID queryId) {
         requireNonNull(queryId);
-        return "QueryResults-" + queryId.toString();
+        return ryaInstance + "-QueryResults-" + queryId.toString();
+    }
+
+    /**
+     * TODO doc
+     *
+     * @param queryResultsTopic
+     * @return
+     */
+    public static String getRyaInstanceFromQueryResultsTopic(final String 
queryResultsTopic) {
+        // TODO
+        return "";
+    }
+
+    /**
+     * TODO doc
+     *
+     * @param queryResultsTopic
+     * @return
+     */
+    public static UUID getQueryIdFromQueryResultsTopic(final String 
queryResultsTopic) {
+        // TODO
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 8093951..63d64b9 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -78,7 +78,7 @@ public class SingleThreadKafkaStreamsFactory implements 
KafkaStreamsFactory {
 
         // Setup the topology that processes the Query.
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = 
KafkaTopics.queryResultsTopic(query.getQueryId());
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
query.getQueryId());
 
         try {
             final TopologyBuilder topologyBuilder = 
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new 
RandomUUIDFactory());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index 529b493..d91edeb 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -66,22 +66,22 @@ public class KafkaGetQueryResultStream<T> implements 
GetQueryResultStream<T> {
     }
 
     @Override
-    public QueryResultStream<T> fromStart(final UUID queryId) throws 
RyaStreamsException {
+    public QueryResultStream<T> fromStart(final String ryaInstance, final UUID 
queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the earliest point within the topic.
-        return makeStream(queryId, "earliest");
+        return makeStream(ryaInstance, queryId, "earliest");
     }
 
     @Override
-    public QueryResultStream<T> fromNow(final UUID queryId) throws 
RyaStreamsException {
+    public QueryResultStream<T> fromNow(final String ryaInstance, final UUID 
queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the latest point within the topic.
-        return makeStream(queryId, "latest");
+        return makeStream(ryaInstance, queryId, "latest");
     }
 
-    private QueryResultStream<T> makeStream(final UUID queryId, final String 
autoOffsetResetConfig) {
+    private QueryResultStream<T> makeStream(final String ryaInstance, final 
UUID queryId, final String autoOffsetResetConfig) {
         // Configure which instance of Kafka to connect to.
         final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -106,7 +106,7 @@ public class KafkaGetQueryResultStream<T> implements 
GetQueryResultStream<T> {
         final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
 
         // Register the consumer for the query's results.
-        final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
         consumer.subscribe(Arrays.asList(resultTopic));
 
         // Return the result stream.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
index a057de7..8eaf080 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -37,7 +37,7 @@ public class KafkaTopicsTest {
         final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
 
         // Show the rya instance name is able to be extracted from the topic.
-        final Optional<String> resolvedRyaInstance = 
KafkaTopics.getRyaInstance(topicName);
+        final Optional<String> resolvedRyaInstance = 
KafkaTopics.getRyaInstanceFromQueryChangeLog(topicName);
         assertEquals(ryaInstance, resolvedRyaInstance.get());
     }
 
@@ -47,7 +47,7 @@ public class KafkaTopicsTest {
         final String invalidTopic = "thisIsABadTopicName";
 
         // Show there is no Rya Instance name in it.
-        final Optional<String> ryaInstance = 
KafkaTopics.getRyaInstance(invalidTopic);
+        final Optional<String> ryaInstance = 
KafkaTopics.getRyaInstanceFromQueryChangeLog(invalidTopic);
         assertFalse( ryaInstance.isPresent() );
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 8882753..59c08b7 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -85,6 +85,7 @@ public class KafkaGetQueryResultStreamIT {
     @Test
     public void fromStart() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Create a list of test VisibilityBindingSets.
@@ -106,7 +107,7 @@ public class KafkaGetQueryResultStreamIT {
         // Write some entries to the query result topic in Kafka.
         try(final Producer<?, VisibilityBindingSet> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
VisibilityBindingSetSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = 
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
             for(final VisibilityBindingSet visBs : original) {
                 producer.send(new ProducerRecord<>(resultTopic, visBs));
             }
@@ -115,7 +116,7 @@ public class KafkaGetQueryResultStreamIT {
         // Use the interactor that is being tested to read all of the 
visibility binding sets.
         final GetQueryResultStream<VisibilityBindingSet> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), 
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, 
interactor.fromStart(queryId));
+        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, 
interactor.fromStart(ryaInstance, queryId));
 
         // Show the fetched binding sets match the original, as well as their 
order.
         assertEquals(original, read);
@@ -124,11 +125,12 @@ public class KafkaGetQueryResultStreamIT {
     @Test
     public void fromNow() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         try(final Producer<?, VisibilityBindingSet> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
VisibilityBindingSetSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = 
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
             // Write a single visibility binding set to the query's result 
topic. This will not appear in the expected results.
             final ValueFactory vf = new ValueFactoryImpl();
@@ -140,7 +142,7 @@ public class KafkaGetQueryResultStreamIT {
             // Use the interactor that is being tested to read all of the 
visibility binding sets that appear after this point.
             final GetQueryResultStream<VisibilityBindingSet> interactor =
                     new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), 
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-            try(QueryResultStream<VisibilityBindingSet> results = 
interactor.fromNow(queryId)) {
+            try(QueryResultStream<VisibilityBindingSet> results = 
interactor.fromNow(ryaInstance, queryId)) {
                 // Read results from the stream.
                 List<VisibilityBindingSet> read = new ArrayList<>();
                 for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -178,12 +180,13 @@ public class KafkaGetQueryResultStreamIT {
     @Test(expected = IllegalStateException.class)
     public void pollClosedStream() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Use the interactor that is being tested to create a result stream 
and immediately close it.
         final GetQueryResultStream<VisibilityBindingSet> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), 
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-        final QueryResultStream<VisibilityBindingSet> results = 
interactor.fromStart(queryId);
+        final QueryResultStream<VisibilityBindingSet> results = 
interactor.fromStart(ryaInstance, queryId);
         results.close();
 
         // Try to poll the closed stream.
@@ -193,6 +196,7 @@ public class KafkaGetQueryResultStreamIT {
     @Test
     public void fromStart_visibilityStatements() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Create some statements that will be written to the result topic.
@@ -205,7 +209,7 @@ public class KafkaGetQueryResultStreamIT {
         // Write the entries to the query result topic in Kafka.
         try(final Producer<?, VisibilityStatement> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = 
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
             for(final VisibilityStatement visStmt : original) {
                 producer.send(new ProducerRecord<>(resultTopic, visStmt));
             }
@@ -214,7 +218,7 @@ public class KafkaGetQueryResultStreamIT {
         // Use the interactor that is being tested to read all of the 
visibility binding sets.
         final GetQueryResultStream<VisibilityStatement> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), 
kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
-        final List<VisibilityStatement> read = pollForResults(500, 3, 3, 
interactor.fromStart(queryId));
+        final List<VisibilityStatement> read = pollForResults(500, 3, 3, 
interactor.fromStart(ryaInstance, queryId));
 
         // Show the fetched binding sets match the original, as well as their 
order.
         assertEquals(original, read);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 5dbd27f..c9abb41 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -90,7 +90,7 @@ public class KafkaRunQueryIT {
         // Add the query to the query repository.
         final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person 
<urn:worksAt> ?business . }", true);
         final UUID queryId = sQuery.getQueryId();
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, 
queryId);
 
         // The thread that will run the tested interactor.
         final Thread testThread = new Thread() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index e746baf..2aa7054 100644
--- 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -171,11 +171,11 @@ public class KafkaQueryChangeLogSource extends 
AbstractScheduledService implemen
             final Set<String> changeLogTopics = new HashSet<>( 
listTopicsConsumer.listTopics().keySet() );
 
             // Remove all topics that are not valid Rya Query Change Log topic 
names.
-            changeLogTopics.removeIf( topic -> 
!KafkaTopics.getRyaInstance(topic).isPresent() );
+            changeLogTopics.removeIf( topic -> 
!KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).isPresent() );
 
             // Extract the Rya instance names from the change log topics.
             final Set<String> ryaInstances = changeLogTopics.stream()
-                    .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+                    .map(topic -> 
KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).get() )
                     .collect(Collectors.toSet());
 
             // Any Rya instances that are in the old set of topics, but not 
the new one, have been deleted.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 3a59636..4bd022a 100644
--- 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -128,7 +128,7 @@ public class LocalQueryExecutor extends AbstractIdleService 
implements QueryExec
             // Make sure the Statements topic exists for the query.
             final Set<String> topics = Sets.newHashSet(
                     KafkaTopics.statementsTopic(ryaInstance),
-                    KafkaTopics.queryResultsTopic(query.getQueryId()));
+                    KafkaTopics.queryResultsTopic(ryaInstance, 
query.getQueryId()));
 
             // Make sure the Query Results topic exists for the query.
             createKafkaTopic.createTopics(topics, 1, 1);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index f9c8a03..6358104 100644
--- 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -138,7 +138,7 @@ public class LocalQueryExecutorIT {
             loadStatements.fromCollection(statements);
 
             // Read the output of the streams program.
-            final String resultsTopic = 
KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            final String resultsTopic = 
KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
             resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
             final List<VisibilityBindingSet> results = 
KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
             assertEquals(expected, results);

Reply via email to