RYA-462 Updated the Kafka topic name for StreamsQueries to include the Rya
Instance name.
Conflicts:
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e07390d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e07390d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e07390d3
Branch: refs/heads/master
Commit: e07390d38e17df13d49eb41a81f0391b92e1e27f
Parents: e453694
Author: kchilton2 <[email protected]>
Authored: Thu Feb 8 17:21:10 2018 -0500
Committer: Valiyil <[email protected]>
Committed: Fri Mar 9 13:00:02 2018 -0500
----------------------------------------------------------------------
.../api/interactor/GetQueryResultStream.java | 6 +-
.../streams/client/command/RunQueryCommand.java | 4 +-
.../client/command/StreamResultsCommand.java | 60 ++++----------------
.../client/command/RunQueryCommandIT.java | 2 +-
.../kafka/processors/filter/GeoFilterIT.java | 2 +-
.../aggregation/AggregationProcessorIT.java | 17 +++---
.../processors/filter/FilterProcessorIT.java | 2 +-
.../processors/filter/TemporalFilterIT.java | 8 +--
.../kafka/processors/join/JoinProcessorIT.java | 10 ++--
.../projection/MultiProjectionProcessorIT.java | 3 +-
.../projection/ProjectionProcessorIT.java | 3 +-
.../sp/StatementPatternProcessorIT.java | 9 ++-
.../apache/rya/streams/kafka/KafkaTopics.java | 28 ++++++++-
.../kafka/SingleThreadKafkaStreamsFactory.java | 2 +-
.../interactor/KafkaGetQueryResultStream.java | 12 ++--
.../rya/streams/kafka/KafkaTopicsTest.java | 4 +-
.../interactor/KafkaGetQueryResultStreamIT.java | 18 +++---
.../kafka/interactor/KafkaRunQueryIT.java | 2 +-
.../kafka/KafkaQueryChangeLogSource.java | 4 +-
.../querymanager/kafka/LocalQueryExecutor.java | 2 +-
.../kafka/LocalQueryExecutorIT.java | 2 +-
21 files changed, 93 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 951d060..0e70391 100644
---
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -37,18 +37,20 @@ public interface GetQueryResultStream<T> {
/**
* Stream all of the results that have been produced by a query.
*
+ * @param ryaInstance - The name of the Rya Instance the query is for.
(not null)
* @param queryId - Indicates which query results to stream. (not null)
* @return A {@link QueryResultStream} that starts with the first result
that was ever produced.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream<T> fromStart(UUID queryId) throws
RyaStreamsException;
+ public QueryResultStream<T> fromStart(String ryaInstance, UUID queryId)
throws RyaStreamsException;
/**
* Stream results that have been produced by a query after this method was
invoked.
*
+ * @param ryaInstance - The name of the Rya Instance the query is for.
(not null)
* @param queryId - Indicates which query results to stream. (not null)
* @return A {@link QueryResultStream} that only returns results that were
produced after this method is invoked.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream<T> fromNow(UUID queryId) throws
RyaStreamsException;
+ public QueryResultStream<T> fromNow(String ryaInstance, UUID queryId)
throws RyaStreamsException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 7b311f6..f9a9458 100644
---
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -135,7 +135,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
// Make sure the topics required by the application exists for
the specified Rya instances.
final Set<String> topics = new HashSet<>();
topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
- topics.add( KafkaTopics.queryResultsTopic(queryId) );
+ topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance,
queryId) );
KafkaTopics.createTopics(params.zookeeperServers, topics, 1,
1);
// Run the query that uses those topics.
@@ -143,7 +143,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
params.kafkaIP,
params.kafkaPort,
KafkaTopics.statementsTopic(params.ryaInstance),
- KafkaTopics.queryResultsTopic(queryId),
+ KafkaTopics.queryResultsTopic(params.ryaInstance,
queryId),
queryRepo,
new TopologyFactory());
runQuery.run(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 8ae4e08..783aedc 100644
---
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,18 +20,11 @@ package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -39,13 +32,13 @@ import
org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
-import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.parser.sparql.SPARQLParser;
@@ -72,20 +65,14 @@ public class StreamResultsCommand implements
RyaStreamsCommand {
@Parameter(names = {"--queryId", "-q"}, required = true, description =
"The query whose results will be streamed to the console.")
private String queryId;
- @Parameter(names = {"--file", "-f"}, required = false, description =
"If provided, the output file the results will stream into.")
- private String outputPath;
-
@Override
public String toString() {
final StringBuilder parameters = new StringBuilder();
parameters.append(super.toString());
if (!Strings.isNullOrEmpty(queryId)) {
- parameters.append("\tQuery ID: " + queryId + "\n");
- }
-
- if(!Strings.isNullOrEmpty(outputPath)) {
- parameters.append("\tOutput Path: " + outputPath + "\n");
+ parameters.append("\tQuery ID: " + queryId);
+ parameters.append("\n");
}
return parameters.toString();
@@ -173,12 +160,10 @@ public class StreamResultsCommand implements
RyaStreamsCommand {
});
// Build the interactor based on the type of result the query produces.
- boolean isStatementResults = false;
-
final GetQueryResultStream<?> getQueryResultStream;
try {
- isStatementResults = QueryInvestigator.isConstruct(sparql) |
QueryInvestigator.isInsertWhere(sparql);
- if(isStatementResults) {
+ final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql,
null).getTupleExpr();
+ if(tupleExpr instanceof Reduced) {
getQueryResultStream = new
KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort,
VisibilityStatementDeserializer.class);
} else {
getQueryResultStream = new
KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort,
VisibilityBindingSetDeserializer.class);
@@ -187,22 +172,12 @@ public class StreamResultsCommand implements
RyaStreamsCommand {
throw new ExecutionException("Could not parse the SPARQL for the
query: " + sparql, e);
}
- // Iterate through the results and print them to the configured output
mechanism.
- try (final QueryResultStream<?> resultsStream =
getQueryResultStream.fromStart(queryId)) {
- final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql,
null).getTupleExpr();
- if(params.outputPath != null) {
- final Path file = Paths.get(params.outputPath);
- try (final OutputStream out = Files.newOutputStream(file)) {
- if(isStatementResults) {
- final QueryResultStream<VisibilityStatement>
stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
- QueryResultsOutputUtil.toNtriplesFile(out, stmtStream,
finished);
- } else {
- final QueryResultStream<VisibilityBindingSet> bsStream
= (QueryResultStream<VisibilityBindingSet>) resultsStream;
- QueryResultsOutputUtil.toBindingSetJSONFile(out,
tupleExpr, bsStream, finished);
- }
+ // Iterate through the results and print them to the console until the
program or the stream ends.
+ try (final QueryResultStream<?> stream =
getQueryResultStream.fromStart(params.ryaInstance, queryId)) {
+ while(!finished.get()) {
+ for(final Object result : stream.poll(1000)) {
+ System.out.println(result);
}
- } else {
- streamToSystemOut(resultsStream, finished);
}
} catch (final Exception e) {
System.err.println("Error while reading the results from the
stream.");
@@ -210,15 +185,4 @@ public class StreamResultsCommand implements
RyaStreamsCommand {
System.exit(1);
}
}
-
- private static void streamToSystemOut(final QueryResultStream<?> stream,
final AtomicBoolean shutdownSignal) throws Exception {
- requireNonNull(stream);
- requireNonNull(shutdownSignal);
-
- while(!shutdownSignal.get()) {
- for(final Object result : stream.poll(1000)) {
- System.out.println(result);
- }
- }
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 5d63f32..176b920 100644
---
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -184,7 +184,7 @@ public class RunQueryCommandIT {
loadStatements.fromCollection(statements);
// Read the output of the streams program.
- final String resultsTopic =
KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ final String resultsTopic =
KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index c090afa..17a290a 100644
---
a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++
b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -88,7 +88,7 @@ public class GeoFilterIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index a4e4a3e..2a1e760 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -34,7 +34,6 @@ import
org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorS
import
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
@@ -90,7 +89,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -137,7 +136,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -183,7 +182,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -233,7 +232,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -283,7 +282,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -371,7 +370,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -380,9 +379,7 @@ public class AggregationProcessorIT {
RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic,
resultsTopic, builder, statements, expected,
VisibilityBindingSetDeserializer.class);
}
- // Ignored because this test is kind of flakey.
@Test
- @Ignore
public void multipleAggregations() throws Exception {
// A query that figures out what the youngest and oldest ages are
across all people.
final String sparql =
@@ -436,7 +433,7 @@ public class AggregationProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql,
statementsTopic, resultsTopic, new RandomUUIDFactory());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index aaa67ea..7fb228a 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -54,7 +54,7 @@ public class FilterProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index 6e27669..11637b7 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -85,7 +85,7 @@ public class TemporalFilterIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -119,7 +119,7 @@ public class TemporalFilterIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -153,7 +153,7 @@ public class TemporalFilterIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -187,7 +187,7 @@ public class TemporalFilterIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index bdb9be6..5f09372 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -69,7 +69,7 @@ public class JoinProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query =
@@ -120,7 +120,7 @@ public class JoinProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query =
@@ -171,7 +171,7 @@ public class JoinProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query =
@@ -228,7 +228,7 @@ public class JoinProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query =
@@ -269,7 +269,7 @@ public class JoinProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index a560294..c6fd1cf 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import
org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
import
org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
import
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -54,7 +53,7 @@ public class MultiProjectionProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Create a topology for the Query that will be tested.
final String sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 322bba9..f53f2c4 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -57,7 +56,7 @@ public class ProjectionProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Create a topology for the Query that will be tested.
final String sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
index ba11e57..fd0a48d 100644
---
a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
+++
b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -55,7 +54,7 @@ public class StatementPatternProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo>
?otherPerson }";
@@ -85,7 +84,7 @@ public class StatementPatternProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo>
?otherPerson }";
@@ -123,7 +122,7 @@ public class StatementPatternProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query = "SELECT * WHERE { "
@@ -157,7 +156,7 @@ public class StatementPatternProcessorIT {
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// Setup a topology.
final String query = "SELECT * WHERE { "
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 989799a..f0cc842 100644
---
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -62,7 +62,7 @@ public class KafkaTopics {
* @param changeLogTopic - The topic to evaluate. (not null)
* @return If the topic is well formatted, then the Rya instance name that
was part of the topic name.
*/
- public static Optional<String> getRyaInstance(final String changeLogTopic)
{
+ public static Optional<String> getRyaInstanceFromQueryChangeLog(final
String changeLogTopic) {
requireNonNull(changeLogTopic);
// Return absent if the provided topic does not represent a query
change log topic.
@@ -93,9 +93,31 @@ public class KafkaTopics {
* @param queryId - The id of the query the topic is for.
* @return The name of the Kafka topic.
*/
- public static String queryResultsTopic(final UUID queryId) {
+ public static String queryResultsTopic(final String ryaInstance, final
UUID queryId) {
requireNonNull(queryId);
- return "QueryResults-" + queryId.toString();
+ return ryaInstance + "-QueryResults-" + queryId.toString();
+ }
+
+ /**
+ * TODO doc
+ *
+ * @param queryResultsTopic
+ * @return
+ */
+ public static String getRyaInstanceFromQueryResultsTopic(final String
queryResultsTopic) {
+ // TODO
+ return "";
+ }
+
+ /**
+ * TODO doc
+ *
+ * @param queryResultsTopic
+ * @return
+ */
+ public static UUID getQueryIdFromQueryResultsTopic(final String
queryResultsTopic) {
+ // TODO
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 8093951..63d64b9 100644
---
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -78,7 +78,7 @@ public class SingleThreadKafkaStreamsFactory implements
KafkaStreamsFactory {
// Setup the topology that processes the Query.
final String statementsTopic =
KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic =
KafkaTopics.queryResultsTopic(query.getQueryId());
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
query.getQueryId());
try {
final TopologyBuilder topologyBuilder =
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new
RandomUUIDFactory());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index 529b493..d91edeb 100644
---
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -66,22 +66,22 @@ public class KafkaGetQueryResultStream<T> implements
GetQueryResultStream<T> {
}
@Override
- public QueryResultStream<T> fromStart(final UUID queryId) throws
RyaStreamsException {
+ public QueryResultStream<T> fromStart(final String ryaInstance, final UUID
queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the earliest point within the topic.
- return makeStream(queryId, "earliest");
+ return makeStream(ryaInstance, queryId, "earliest");
}
@Override
- public QueryResultStream<T> fromNow(final UUID queryId) throws
RyaStreamsException {
+ public QueryResultStream<T> fromNow(final String ryaInstance, final UUID
queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the latest point within the topic.
- return makeStream(queryId, "latest");
+ return makeStream(ryaInstance, queryId, "latest");
}
- private QueryResultStream<T> makeStream(final UUID queryId, final String
autoOffsetResetConfig) {
+ private QueryResultStream<T> makeStream(final String ryaInstance, final
UUID queryId, final String autoOffsetResetConfig) {
// Configure which instance of Kafka to connect to.
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -106,7 +106,7 @@ public class KafkaGetQueryResultStream<T> implements
GetQueryResultStream<T> {
final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
// Register the consumer for the query's results.
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
consumer.subscribe(Arrays.asList(resultTopic));
// Return the result stream.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
index a057de7..8eaf080 100644
---
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
+++
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -37,7 +37,7 @@ public class KafkaTopicsTest {
final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
// Show the rya instance name is able to be extracted from the topic.
- final Optional<String> resolvedRyaInstance =
KafkaTopics.getRyaInstance(topicName);
+ final Optional<String> resolvedRyaInstance =
KafkaTopics.getRyaInstanceFromQueryChangeLog(topicName);
assertEquals(ryaInstance, resolvedRyaInstance.get());
}
@@ -47,7 +47,7 @@ public class KafkaTopicsTest {
final String invalidTopic = "thisIsABadTopicName";
// Show there is no Rya Instance name in it.
- final Optional<String> ryaInstance =
KafkaTopics.getRyaInstance(invalidTopic);
+ final Optional<String> ryaInstance =
KafkaTopics.getRyaInstanceFromQueryChangeLog(invalidTopic);
assertFalse( ryaInstance.isPresent() );
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 8882753..59c08b7 100644
---
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -85,6 +85,7 @@ public class KafkaGetQueryResultStreamIT {
@Test
public void fromStart() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Create a list of test VisibilityBindingSets.
@@ -106,7 +107,7 @@ public class KafkaGetQueryResultStreamIT {
// Write some entries to the query result topic in Kafka.
try(final Producer<?, VisibilityBindingSet> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class,
VisibilityBindingSetSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic =
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
for(final VisibilityBindingSet visBs : original) {
producer.send(new ProducerRecord<>(resultTopic, visBs));
}
@@ -115,7 +116,7 @@ public class KafkaGetQueryResultStreamIT {
// Use the interactor that is being tested to read all of the
visibility binding sets.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(),
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- final List<VisibilityBindingSet> read = pollForResults(500, 3, 3,
interactor.fromStart(queryId));
+ final List<VisibilityBindingSet> read = pollForResults(500, 3, 3,
interactor.fromStart(ryaInstance, queryId));
// Show the fetched binding sets match the original, as well as their
order.
assertEquals(original, read);
@@ -124,11 +125,12 @@ public class KafkaGetQueryResultStreamIT {
@Test
public void fromNow() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
try(final Producer<?, VisibilityBindingSet> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class,
VisibilityBindingSetSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic =
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Write a single visibility binding set to the query's result
topic. This will not appear in the expected results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -140,7 +142,7 @@ public class KafkaGetQueryResultStreamIT {
// Use the interactor that is being tested to read all of the
visibility binding sets that appear after this point.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(),
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- try(QueryResultStream<VisibilityBindingSet> results =
interactor.fromNow(queryId)) {
+ try(QueryResultStream<VisibilityBindingSet> results =
interactor.fromNow(ryaInstance, queryId)) {
// Read results from the stream.
List<VisibilityBindingSet> read = new ArrayList<>();
for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -178,12 +180,13 @@ public class KafkaGetQueryResultStreamIT {
@Test(expected = IllegalStateException.class)
public void pollClosedStream() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Use the interactor that is being tested to create a result stream
and immediately close it.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(),
kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- final QueryResultStream<VisibilityBindingSet> results =
interactor.fromStart(queryId);
+ final QueryResultStream<VisibilityBindingSet> results =
interactor.fromStart(ryaInstance, queryId);
results.close();
// Try to poll the closed stream.
@@ -193,6 +196,7 @@ public class KafkaGetQueryResultStreamIT {
@Test
public void fromStart_visibilityStatements() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Create some statements that will be written to the result topic.
@@ -205,7 +209,7 @@ public class KafkaGetQueryResultStreamIT {
// Write the entries to the query result topic in Kafka.
try(final Producer<?, VisibilityStatement> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class,
VisibilityStatementSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic =
KafkaTopics.queryResultsTopic(ryaInstance, queryId);
for(final VisibilityStatement visStmt : original) {
producer.send(new ProducerRecord<>(resultTopic, visStmt));
}
@@ -214,7 +218,7 @@ public class KafkaGetQueryResultStreamIT {
// Use the interactor that is being tested to read all of the
visibility binding sets.
final GetQueryResultStream<VisibilityStatement> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(),
kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
- final List<VisibilityStatement> read = pollForResults(500, 3, 3,
interactor.fromStart(queryId));
+ final List<VisibilityStatement> read = pollForResults(500, 3, 3,
interactor.fromStart(ryaInstance, queryId));
// Show the fetched binding sets match the original, as well as their
order.
assertEquals(original, read);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 5dbd27f..c9abb41 100644
---
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -90,7 +90,7 @@ public class KafkaRunQueryIT {
// Add the query to the query repository.
final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person
<urn:worksAt> ?business . }", true);
final UUID queryId = sQuery.getQueryId();
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance,
queryId);
// The thread that will run the tested interactor.
final Thread testThread = new Thread() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index e746baf..2aa7054 100644
---
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -171,11 +171,11 @@ public class KafkaQueryChangeLogSource extends
AbstractScheduledService implemen
final Set<String> changeLogTopics = new HashSet<>(
listTopicsConsumer.listTopics().keySet() );
// Remove all topics that are not valid Rya Query Change Log topic
names.
- changeLogTopics.removeIf( topic ->
!KafkaTopics.getRyaInstance(topic).isPresent() );
+ changeLogTopics.removeIf( topic ->
!KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).isPresent() );
// Extract the Rya instance names from the change log topics.
final Set<String> ryaInstances = changeLogTopics.stream()
- .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+ .map(topic ->
KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).get() )
.collect(Collectors.toSet());
// Any Rya instances that are in the old set of topics, but not
the new one, have been deleted.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 3a59636..4bd022a 100644
---
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -128,7 +128,7 @@ public class LocalQueryExecutor extends AbstractIdleService
implements QueryExec
// Make sure the Statements topic exists for the query.
final Set<String> topics = Sets.newHashSet(
KafkaTopics.statementsTopic(ryaInstance),
- KafkaTopics.queryResultsTopic(query.getQueryId()));
+ KafkaTopics.queryResultsTopic(ryaInstance,
query.getQueryId()));
// Make sure the Query Results topic exists for the query.
createKafkaTopic.createTopics(topics, 1, 1);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index f9c8a03..6358104 100644
---
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -138,7 +138,7 @@ public class LocalQueryExecutorIT {
loadStatements.fromCollection(statements);
// Read the output of the streams program.
- final String resultsTopic =
KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ final String resultsTopic =
KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
final List<VisibilityBindingSet> results =
KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
assertEquals(expected, results);