RYA-462 Updated the Kafka topic name for StreamsQueries to include the Rya Instance name.
Conflicts: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e07390d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e07390d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e07390d3 Branch: refs/heads/master Commit: e07390d38e17df13d49eb41a81f0391b92e1e27f Parents: e453694 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Thu Feb 8 17:21:10 2018 -0500 Committer: Valiyil <puja.vali...@parsons.com> Committed: Fri Mar 9 13:00:02 2018 -0500 ---------------------------------------------------------------------- .../api/interactor/GetQueryResultStream.java | 6 +- .../streams/client/command/RunQueryCommand.java | 4 +- .../client/command/StreamResultsCommand.java | 60 ++++---------------- .../client/command/RunQueryCommandIT.java | 2 +- .../kafka/processors/filter/GeoFilterIT.java | 2 +- .../aggregation/AggregationProcessorIT.java | 17 +++--- .../processors/filter/FilterProcessorIT.java | 2 +- .../processors/filter/TemporalFilterIT.java | 8 +-- .../kafka/processors/join/JoinProcessorIT.java | 10 ++-- .../projection/MultiProjectionProcessorIT.java | 3 +- .../projection/ProjectionProcessorIT.java | 3 +- .../sp/StatementPatternProcessorIT.java | 9 ++- .../apache/rya/streams/kafka/KafkaTopics.java | 28 ++++++++- .../kafka/SingleThreadKafkaStreamsFactory.java | 2 +- .../interactor/KafkaGetQueryResultStream.java | 12 ++-- .../rya/streams/kafka/KafkaTopicsTest.java | 4 +- .../interactor/KafkaGetQueryResultStreamIT.java | 18 +++--- .../kafka/interactor/KafkaRunQueryIT.java | 2 +- .../kafka/KafkaQueryChangeLogSource.java | 4 +- .../querymanager/kafka/LocalQueryExecutor.java | 2 +- .../kafka/LocalQueryExecutorIT.java | 2 +- 21 files changed, 93 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java index 951d060..0e70391 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java @@ -37,18 +37,20 @@ public interface GetQueryResultStream<T> { /** * Stream all of the results that have been produced by a query. * + * @param ryaInstance - The name of the Rya Instance the query is for. (not null) * @param queryId - Indicates which query results to stream. (not null) * @return A {@link QueryResultStream} that starts with the first result that was ever produced. * @throws RyaStreamsException Could not create the result stream. */ - public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException; + public QueryResultStream<T> fromStart(String ryaInstance, UUID queryId) throws RyaStreamsException; /** * Stream results that have been produced by a query after this method was invoked. * + * @param ryaInstance - The name of the Rya Instance the query is for. (not null) * @param queryId - Indicates which query results to stream. (not null) * @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked. * @throws RyaStreamsException Could not create the result stream. */ - public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException; + public QueryResultStream<T> fromNow(String ryaInstance, UUID queryId) throws RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java index 7b311f6..f9a9458 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -135,7 +135,7 @@ public class RunQueryCommand implements RyaStreamsCommand { // Make sure the topics required by the application exists for the specified Rya instances. final Set<String> topics = new HashSet<>(); topics.add( KafkaTopics.statementsTopic(params.ryaInstance) ); - topics.add( KafkaTopics.queryResultsTopic(queryId) ); + topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) ); KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1); // Run the query that uses those topics. @@ -143,7 +143,7 @@ public class RunQueryCommand implements RyaStreamsCommand { params.kafkaIP, params.kafkaPort, KafkaTopics.statementsTopic(params.ryaInstance), - KafkaTopics.queryResultsTopic(queryId), + KafkaTopics.queryResultsTopic(params.ryaInstance, queryId), queryRepo, new TopologyFactory()); runQuery.run(queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java index 8ae4e08..783aedc 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,18 +20,11 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.api.utils.QueryInvestigator; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.interactor.GetQueryResultStream; @@ -39,13 +32,13 @@ import org.apache.rya.streams.api.queries.InMemoryQueryRepository; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; -import org.apache.rya.streams.client.util.QueryResultsOutputUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.Reduced; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.parser.sparql.SPARQLParser; @@ -72,20 +65,14 @@ public class StreamResultsCommand implements RyaStreamsCommand { @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.") private String queryId; - @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.") - private String outputPath; - @Override public String toString() { final StringBuilder parameters = new StringBuilder(); parameters.append(super.toString()); if (!Strings.isNullOrEmpty(queryId)) { - parameters.append("\tQuery ID: " + queryId + "\n"); - } - - if(!Strings.isNullOrEmpty(outputPath)) { - parameters.append("\tOutput Path: " + outputPath + "\n"); + parameters.append("\tQuery ID: " + queryId); + parameters.append("\n"); } return parameters.toString(); @@ -173,12 +160,10 @@ public class StreamResultsCommand implements RyaStreamsCommand { }); // Build the interactor based on the type of result the query produces. - boolean isStatementResults = false; - final GetQueryResultStream<?> getQueryResultStream; try { - isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql); - if(isStatementResults) { + final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + if(tupleExpr instanceof Reduced) { getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class); } else { getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class); @@ -187,22 +172,12 @@ public class StreamResultsCommand implements RyaStreamsCommand { throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e); } - // Iterate through the results and print them to the configured output mechanism. - try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) { - final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); - if(params.outputPath != null) { - final Path file = Paths.get(params.outputPath); - try (final OutputStream out = Files.newOutputStream(file)) { - if(isStatementResults) { - final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream; - QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished); - } else { - final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream; - QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished); - } + // Iterate through the results and print them to the console until the program or the stream ends. + try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(params.ryaInstance, queryId)) { + while(!finished.get()) { + for(final Object result : stream.poll(1000)) { + System.out.println(result); } - } else { - streamToSystemOut(resultsStream, finished); } } catch (final Exception e) { System.err.println("Error while reading the results from the stream."); @@ -210,15 +185,4 @@ public class StreamResultsCommand implements RyaStreamsCommand { System.exit(1); } } - - private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception { - requireNonNull(stream); - requireNonNull(shutdownSignal); - - while(!shutdownSignal.get()) { - for(final Object result : stream.poll(1000)) { - System.out.println(result); - } - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index 5d63f32..176b920 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -184,7 +184,7 @@ public class RunQueryCommandIT { loadStatements.fromCollection(statements); // Read the output of the streams program. - final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId()); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId()); resultConsumer.subscribe( Lists.newArrayList(resultsTopic) ); results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer); } finally { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java index c090afa..17a290a 100644 --- a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java +++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java @@ -88,7 +88,7 @@ public class GeoFilterIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java index a4e4a3e..2a1e760 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java @@ -34,7 +34,6 @@ import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorS import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; @@ -90,7 +89,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -137,7 +136,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -183,7 +182,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -233,7 +232,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -283,7 +282,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -371,7 +370,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); @@ -380,9 +379,7 @@ public class AggregationProcessorIT { RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); } - // Ignored because this test is kind of flakey. @Test - @Ignore public void multipleAggregations() throws Exception { // A query that figures out what the youngest and oldest ages are across all people. final String sparql = @@ -436,7 +433,7 @@ public class AggregationProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java index aaa67ea..7fb228a 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java @@ -54,7 +54,7 @@ public class FilterProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java index 6e27669..11637b7 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java @@ -85,7 +85,7 @@ public class TemporalFilterIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = @@ -119,7 +119,7 @@ public class TemporalFilterIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = @@ -153,7 +153,7 @@ public class TemporalFilterIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = @@ -187,7 +187,7 @@ public class TemporalFilterIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Get the RDF model objects that will be used to build the query. final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java index bdb9be6..5f09372 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -69,7 +69,7 @@ public class JoinProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = @@ -120,7 +120,7 @@ public class JoinProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = @@ -171,7 +171,7 @@ public class JoinProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = @@ -228,7 +228,7 @@ public class JoinProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = @@ -269,7 +269,7 @@ public class JoinProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java index a560294..c6fd1cf 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier; import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -54,7 +53,7 @@ public class MultiProjectionProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Create a topology for the Query that will be tested. final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java index 322bba9..f53f2c4 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java @@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -57,7 +56,7 @@ public class ProjectionProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Create a topology for the Query that will be tested. final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java index ba11e57..fd0a48d 100644 --- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java @@ -30,7 +30,6 @@ import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -55,7 +54,7 @@ public class StatementPatternProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; @@ -85,7 +84,7 @@ public class StatementPatternProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; @@ -123,7 +122,7 @@ public class StatementPatternProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = "SELECT * WHERE { " @@ -157,7 +156,7 @@ public class StatementPatternProcessorIT { final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Setup a topology. final String query = "SELECT * WHERE { " http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index 989799a..f0cc842 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -62,7 +62,7 @@ public class KafkaTopics { * @param changeLogTopic - The topic to evaluate. (not null) * @return If the topic is well formatted, then the Rya instance name that was part of the topic name. */ - public static Optional<String> getRyaInstance(final String changeLogTopic) { + public static Optional<String> getRyaInstanceFromQueryChangeLog(final String changeLogTopic) { requireNonNull(changeLogTopic); // Return absent if the provided topic does not represent a query change log topic. @@ -93,9 +93,31 @@ public class KafkaTopics { * @param queryId - The id of the query the topic is for. * @return The name of the Kafka topic. */ - public static String queryResultsTopic(final UUID queryId) { + public static String queryResultsTopic(final String ryaInstance, final UUID queryId) { requireNonNull(queryId); - return "QueryResults-" + queryId.toString(); + return ryaInstance + "-QueryResults-" + queryId.toString(); + } + + /** + * TODO doc + * + * @param queryResultsTopic + * @return + */ + public static String getRyaInstanceFromQueryResultsTopic(final String queryResultsTopic) { + // TODO + return ""; + } + + /** + * TODO doc + * + * @param queryResultsTopic + * @return + */ + public static UUID getQueryIdFromQueryResultsTopic(final String queryResultsTopic) { + // TODO + return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java index 8093951..63d64b9 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java @@ -78,7 +78,7 @@ public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { // Setup the topology that processes the Query. final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId()); try { final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java index 529b493..d91edeb 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java @@ -66,22 +66,22 @@ public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> { } @Override - public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException { + public QueryResultStream<T> fromStart(final String ryaInstance, final UUID queryId) throws RyaStreamsException { requireNonNull(queryId); // Always start at the earliest point within the topic. - return makeStream(queryId, "earliest"); + return makeStream(ryaInstance, queryId, "earliest"); } @Override - public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException { + public QueryResultStream<T> fromNow(final String ryaInstance, final UUID queryId) throws RyaStreamsException { requireNonNull(queryId); // Always start at the latest point within the topic. - return makeStream(queryId, "latest"); + return makeStream(ryaInstance, queryId, "latest"); } - private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) { + private QueryResultStream<T> makeStream(final String ryaInstance, final UUID queryId, final String autoOffsetResetConfig) { // Configure which instance of Kafka to connect to. final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -106,7 +106,7 @@ public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> { final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props); // Register the consumer for the query's results. - final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); consumer.subscribe(Arrays.asList(resultTopic)); // Return the result stream. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java index a057de7..8eaf080 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java @@ -37,7 +37,7 @@ public class KafkaTopicsTest { final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance); // Show the rya instance name is able to be extracted from the topic. - final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName); + final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(topicName); assertEquals(ryaInstance, resolvedRyaInstance.get()); } @@ -47,7 +47,7 @@ public class KafkaTopicsTest { final String invalidTopic = "thisIsABadTopicName"; // Show there is no Rya Instance name in it. - final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic); + final Optional<String> ryaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(invalidTopic); assertFalse( ryaInstance.isPresent() ); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java index 8882753..59c08b7 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java @@ -85,6 +85,7 @@ public class KafkaGetQueryResultStreamIT { @Test public void fromStart() throws Exception { // Create an ID for the query. + final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); // Create a list of test VisibilityBindingSets. @@ -106,7 +107,7 @@ public class KafkaGetQueryResultStreamIT { // Write some entries to the query result topic in Kafka. try(final Producer<?, VisibilityBindingSet> producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) { - final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); for(final VisibilityBindingSet visBs : original) { producer.send(new ProducerRecord<>(resultTopic, visBs)); } @@ -115,7 +116,7 @@ public class KafkaGetQueryResultStreamIT { // Use the interactor that is being tested to read all of the visibility binding sets. final GetQueryResultStream<VisibilityBindingSet> interactor = new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); - final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId)); + final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId)); // Show the fetched binding sets match the original, as well as their order. assertEquals(original, read); @@ -124,11 +125,12 @@ public class KafkaGetQueryResultStreamIT { @Test public void fromNow() throws Exception { // Create an ID for the query. + final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); try(final Producer<?, VisibilityBindingSet> producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) { - final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // Write a single visibility binding set to the query's result topic. This will not appear in the expected results. final ValueFactory vf = new ValueFactoryImpl(); @@ -140,7 +142,7 @@ public class KafkaGetQueryResultStreamIT { // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point. final GetQueryResultStream<VisibilityBindingSet> interactor = new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); - try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) { + try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(ryaInstance, queryId)) { // Read results from the stream. List<VisibilityBindingSet> read = new ArrayList<>(); for(final VisibilityBindingSet visBs : results.poll(500)) { @@ -178,12 +180,13 @@ public class KafkaGetQueryResultStreamIT { @Test(expected = IllegalStateException.class) public void pollClosedStream() throws Exception { // Create an ID for the query. + final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); // Use the interactor that is being tested to create a result stream and immediately close it. final GetQueryResultStream<VisibilityBindingSet> interactor = new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); - final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId); + final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(ryaInstance, queryId); results.close(); // Try to poll the closed stream. @@ -193,6 +196,7 @@ public class KafkaGetQueryResultStreamIT { @Test public void fromStart_visibilityStatements() throws Exception { // Create an ID for the query. + final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); // Create some statements that will be written to the result topic. @@ -205,7 +209,7 @@ public class KafkaGetQueryResultStreamIT { // Write the entries to the query result topic in Kafka. try(final Producer<?, VisibilityStatement> producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { - final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); for(final VisibilityStatement visStmt : original) { producer.send(new ProducerRecord<>(resultTopic, visStmt)); } @@ -214,7 +218,7 @@ public class KafkaGetQueryResultStreamIT { // Use the interactor that is being tested to read all of the visibility binding sets. final GetQueryResultStream<VisibilityStatement> interactor = new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class); - final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId)); + final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId)); // Show the fetched binding sets match the original, as well as their order. assertEquals(original, read); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java index 5dbd27f..c9abb41 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java @@ -90,7 +90,7 @@ public class KafkaRunQueryIT { // Add the query to the query repository. final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true); final UUID queryId = sQuery.getQueryId(); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); // The thread that will run the tested interactor. final Thread testThread = new Thread() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java index e746baf..2aa7054 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java @@ -171,11 +171,11 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() ); // Remove all topics that are not valid Rya Query Change Log topic names. - changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() ); + changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).isPresent() ); // Extract the Rya instance names from the change log topics. final Set<String> ryaInstances = changeLogTopics.stream() - .map(topic -> KafkaTopics.getRyaInstance(topic).get() ) + .map(topic -> KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).get() ) .collect(Collectors.toSet()); // Any Rya instances that are in the old set of topics, but not the new one, have been deleted. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java index 3a59636..4bd022a 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java @@ -128,7 +128,7 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec // Make sure the Statements topic exists for the query. final Set<String> topics = Sets.newHashSet( KafkaTopics.statementsTopic(ryaInstance), - KafkaTopics.queryResultsTopic(query.getQueryId())); + KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId())); // Make sure the Query Results topic exists for the query. createKafkaTopic.createTopics(topics, 1, 1); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e07390d3/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java index f9c8a03..6358104 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java @@ -138,7 +138,7 @@ public class LocalQueryExecutorIT { loadStatements.fromCollection(statements); // Read the output of the streams program. - final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId()); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId()); resultConsumer.subscribe( Lists.newArrayList(resultsTopic) ); final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer); assertEquals(expected, results);