RYA-377 Updated the Rya Streams client to be able to stream VisibilityStatement results to the console.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/bd36443d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/bd36443d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/bd36443d Branch: refs/heads/master Commit: bd36443de7e824c9fe8f0a97d3ef7a75c223271c Parents: 95df37a Author: kchilton2 <[email protected]> Authored: Thu Nov 30 17:25:31 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- extras/rya.streams/api/pom.xml | 5 ++ .../streams/api/entity/QueryResultStream.java | 13 ++-- .../api/interactor/GetQueryResultStream.java | 8 ++- .../interactor/defaults/DefaultAddQuery.java | 16 ++++- .../defaults/DefaultAddQueryTest.java | 64 ++++++++++++++++++++ .../client/command/StreamResultsCommand.java | 54 +++++++++++++++-- extras/rya.streams/kafka/pom.xml | 2 +- .../kafka/entity/KafkaQueryResultStream.java | 11 ++-- .../interactor/KafkaGetQueryResultStream.java | 27 ++++++--- .../interactor/KafkaGetQueryResultStreamIT.java | 59 ++++++++++++++---- 10 files changed, 216 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml index 2a1f51c..250028f 100644 --- a/extras/rya.streams/api/pom.xml +++ b/extras/rya.streams/api/pom.xml @@ -41,6 +41,11 @@ under the License. </dependency> <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-sparql</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java index aa5dcfd..8f1e589 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java @@ -22,17 +22,18 @@ import static java.util.Objects.requireNonNull; import java.util.UUID; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.exception.RyaStreamsException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; /** - * An infinite stream of {@link VisibilityBindingSet}s that are the results of a query within Rya Streams. + * An infinite stream of values that are the results of a query within Rya Streams. + * + * @param <V> - The query results' value type. */ @DefaultAnnotation(NonNull.class) -public abstract class QueryResultStream implements AutoCloseable { +public abstract class QueryResultStream<V> implements AutoCloseable { private final UUID queryId; @@ -57,10 +58,10 @@ public abstract class QueryResultStream implements AutoCloseable { * Wait at most {@code timeoutMs} milliseconds for the next collection of results. * * @param timeoutMs - The number of milliseconds to at most wait for the next collection of results. (not null) - * @return The next collection of {@link VisibilityBindingSet}s that are the result of the query. Empty if - * there where no new results within the timout period. + * @return The next collection of values that are the result of the query. Empty if there where no new results + * within the timeout period. * @throws IllegalStateException If the stream has been closed. * @throws RyaStreamsException Could not fetch the next set of results. */ - public abstract Iterable<VisibilityBindingSet> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException; + public abstract Iterable<V> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java index 9ca577c..951d060 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java @@ -28,9 +28,11 @@ import edu.umd.cs.findbugs.annotations.NonNull; /** * Get a {@link QueryResultStream} over the results of a query that is being managed by Rya Streams. + * + * @param <T> - The type of results that are in the result stream. */ @DefaultAnnotation(NonNull.class) -public interface GetQueryResultStream { +public interface GetQueryResultStream<T> { /** * Stream all of the results that have been produced by a query. @@ -39,7 +41,7 @@ public interface GetQueryResultStream { * @return A {@link QueryResultStream} that starts with the first result that was ever produced. * @throws RyaStreamsException Could not create the result stream. */ - public QueryResultStream fromStart(UUID queryId) throws RyaStreamsException; + public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException; /** * Stream results that have been produced by a query after this method was invoked. @@ -48,5 +50,5 @@ public interface GetQueryResultStream { * @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked. * @throws RyaStreamsException Could not create the result stream. */ - public QueryResultStream fromNow(UUID queryId) throws RyaStreamsException; + public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java index 9704322..f94835c 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java @@ -24,6 +24,8 @@ import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.AddQuery; import org.apache.rya.streams.api.queries.QueryRepository; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.sparql.SPARQLParser; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -35,6 +37,8 @@ import edu.umd.cs.findbugs.annotations.NonNull; public class DefaultAddQuery implements AddQuery { private final QueryRepository repository; + private final SPARQLParser parser = new SPARQLParser(); + /** * Creates a new {@link DefaultAddQuery}. * @@ -46,6 +50,16 @@ public class DefaultAddQuery implements AddQuery { @Override public StreamsQuery addQuery(final String query) throws RyaStreamsException { + requireNonNull(query); + + // Make sure the SPARQL is valid. + try { + parser.parseQuery(query, null); + } catch (final MalformedQueryException e) { + throw new RyaStreamsException("Could not add the query because the SPARQL is invalid.", e); + } + + // If it is, then store it in the repository. return repository.add(query); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java new file mode 100644 index 0000000..88be6e7 --- /dev/null +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.AddQuery; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.junit.Test; + +/** + * Unit tests the methods of {@link DefaultAddQuery}. + */ +public class DefaultAddQueryTest { + + @Test + public void addQuery_validSparql() throws Exception { + // Valid SPARQL. + final String sparql = "SELECT * WHERE { ?person <urn:worksAt> ?business }"; + + // Setup the interactor. + final QueryRepository repo = mock(QueryRepository.class); + final AddQuery addQuery = new DefaultAddQuery(repo); + + // Add the query. + addQuery.addQuery(sparql); + + // Verify the call was forwarded to the repository. + verify(repo, times(1)).add(eq(sparql)); + } + + @Test(expected = RyaStreamsException.class) + public void addQuery_invalidSparql() throws Exception { + // Inalid SPARQL. + final String sparql = "This is not sparql."; + + // Setup the interactor. + final QueryRepository repo = mock(QueryRepository.class); + final AddQuery addQuery = new DefaultAddQuery(repo); + + // Add the query. + addQuery.addQuery(sparql); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java index 64f78a3..7c548f1 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -20,14 +20,26 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.sparql.SPARQLParser; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -107,6 +119,12 @@ public class StreamResultsCommand implements RyaStreamsCommand { throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e); } + // Create the Kafka backed QueryChangeLog. + final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort; + final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); + final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + + // Parse the Query ID from the command line parameters. final UUID queryId; try { queryId = UUID.fromString( params.queryId ); @@ -114,6 +132,19 @@ public class StreamResultsCommand implements RyaStreamsCommand { throw new ArgumentsException("Invalid Query ID " + params.queryId); } + // Fetch the SPARQL of the query whose results will be streamed. + final String sparql; + try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + final Optional<StreamsQuery> sQuery = queryRepo.get(queryId); + if(!sQuery.isPresent()) { + throw new ExecutionException("Could not read the results for query with ID " + queryId + + " because no such query exists."); + } + sparql = sQuery.get().getSparql(); + } catch (final Exception e) { + throw new ExecutionException("Problem encountered while closing the QueryRepository.", e); + } + // This command executes until the application is killed, so create a kill boolean. final AtomicBoolean finished = new AtomicBoolean(false); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -123,13 +154,24 @@ public class StreamResultsCommand implements RyaStreamsCommand { } }); - // Execute the command. - final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort); + // Build the interactor based on the type of result the query produces. + final GetQueryResultStream<?> getQueryResultStream; + try { + final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + if(tupleExpr instanceof Reduced) { + getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class); + } else { + getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class); + } + } catch (final MalformedQueryException e) { + throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e); + } - try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) { + // Iterate through the results and print them to the console until the program or the stream ends. + try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) { while(!finished.get()) { - for(final VisibilityBindingSet visBs : stream.poll(1000)) { - System.out.println(visBs); + for(final Object result : stream.poll(1000)) { + System.out.println(result); } } } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 2d33f32..8926870 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -41,7 +41,7 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.functions.geo</artifactId> - <version>3.2.12-incubating-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.rya</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java index 360aaa2..02a3812 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java @@ -26,7 +26,6 @@ import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.exception.RyaStreamsException; @@ -37,11 +36,13 @@ import edu.umd.cs.findbugs.annotations.NonNull; * A Kafka implementation of {@link QueryResultStream}. It delegates the {@link #poll(long)} method to * a {@link Consumer}. As a result, the starting point of this stream is whatever position the consumer * starts at within the Kafka topic. + * + * @param <V> - The type of the consumed records' value. */ @DefaultAnnotation(NonNull.class) -public class KafkaQueryResultStream extends QueryResultStream { +public class KafkaQueryResultStream<V> extends QueryResultStream<V> { - private final Consumer<?, VisibilityBindingSet> consumer; + private final Consumer<?, V> consumer; /** * Constructs an instance of {@link KafkaQueryResultStream}. @@ -49,13 +50,13 @@ public class KafkaQueryResultStream extends QueryResultStream { * @param queryId - The query the results are for. (not null) * @param consumer - The consumer that will be polled by this class. (not null) */ - public KafkaQueryResultStream(final UUID queryId, final Consumer<?, VisibilityBindingSet> consumer) { + public KafkaQueryResultStream(final UUID queryId, final Consumer<?, V> consumer) { super(queryId); this.consumer = requireNonNull(consumer); } @Override - public Iterable<VisibilityBindingSet> poll(final long timeoutMs) throws RyaStreamsException { + public Iterable<V> poll(final long timeoutMs) throws RyaStreamsException { return new RecordEntryIterable<>( consumer.poll(timeoutMs) ); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java index b3c3fea..529b493 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java @@ -26,40 +26,47 @@ import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.GetQueryResultStream; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; /** * A Kafka topic implementation of {@link GetQueryResultStream}. + * + * @param <T> - The type of results that are in the result stream. */ @DefaultAnnotation(NonNull.class) -public class KafkaGetQueryResultStream implements GetQueryResultStream { +public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> { private final String bootstrapServers; + private final Class<? extends Deserializer<T>> deserializerClass; /** * Constructs an instance of {@link KafkaGetQueryResultStream}. * * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null) * @param kafkaPort - The port of the Kafka Broker to connect to. (not null) + * @param deserializerClass - The value deserializer to use when reading from the Kafka topic. (not null) */ - public KafkaGetQueryResultStream(final String kafkaHostname, final String kafkaPort) { + public KafkaGetQueryResultStream( + final String kafkaHostname, + final String kafkaPort, + final Class<? extends Deserializer<T>> deserializerClass) { requireNonNull(kafkaHostname); requireNonNull(kafkaPort); bootstrapServers = kafkaHostname + ":" + kafkaPort; + this.deserializerClass = requireNonNull(deserializerClass); } @Override - public QueryResultStream fromStart(final UUID queryId) throws RyaStreamsException { + public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException { requireNonNull(queryId); // Always start at the earliest point within the topic. @@ -67,21 +74,21 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream { } @Override - public QueryResultStream fromNow(final UUID queryId) throws RyaStreamsException { + public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException { requireNonNull(queryId); // Always start at the latest point within the topic. return makeStream(queryId, "latest"); } - private QueryResultStream makeStream(final UUID queryId, final String autoOffsetResetConfig) { + private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) { // Configure which instance of Kafka to connect to. final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Nothing meaningful is in the key and the value is a VisibilityBindingSet. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityBindingSetDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerClass); // Use a UUID for the Group Id so that we never register as part of the same group as another consumer. props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); @@ -96,13 +103,13 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream { // We are not closing the consumer here because the returned QueryResultStream is responsible for closing the // underlying resources required to process it. - final KafkaConsumer<Object, VisibilityBindingSet> consumer = new KafkaConsumer<>(props); + final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props); // Register the consumer for the query's results. final String resultTopic = KafkaTopics.queryResultsTopic(queryId); consumer.subscribe(Arrays.asList(resultTopic)); // Return the result stream. - return new KafkaQueryResultStream(queryId, consumer); + return new KafkaQueryResultStream<>(queryId, consumer); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java index c740ba2..8882753 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java @@ -29,10 +29,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.interactor.GetQueryResultStream; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; @@ -54,23 +58,23 @@ public class KafkaGetQueryResultStreamIT { * the target number of results, or it hits the target number of results. * * @param pollMs - How long each poll could take. - * @param pollIterations - The maximum nubmer of polls that will be attempted. + * @param pollIterations - The maximum number of polls that will be attempted. * @param targetSize - The number of results to read before stopping. * @param stream - The stream that will be polled. * @return The results that were read from the stream. * @throws Exception If the poll failed. */ - private List<VisibilityBindingSet> pollForResults( + private <T> List<T> pollForResults( final int pollMs, final int pollIterations, final int targetSize, - final QueryResultStream stream) throws Exception{ - final List<VisibilityBindingSet> read = new ArrayList<>(); + final QueryResultStream<T> stream) throws Exception{ + final List<T> read = new ArrayList<>(); int i = 0; while(read.size() < targetSize && i < pollIterations) { - for(final VisibilityBindingSet visBs : stream.poll(pollMs)) { - read.add( visBs ); + for(final T result : stream.poll(pollMs)) { + read.add( result ); } i++; } @@ -109,7 +113,8 @@ public class KafkaGetQueryResultStreamIT { } // Use the interactor that is being tested to read all of the visibility binding sets. - final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); + final GetQueryResultStream<VisibilityBindingSet> interactor = + new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId)); // Show the fetched binding sets match the original, as well as their order. @@ -133,8 +138,9 @@ public class KafkaGetQueryResultStreamIT { producer.flush(); // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point. - final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); - try(QueryResultStream results = interactor.fromNow(queryId)) { + final GetQueryResultStream<VisibilityBindingSet> interactor = + new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); + try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) { // Read results from the stream. List<VisibilityBindingSet> read = new ArrayList<>(); for(final VisibilityBindingSet visBs : results.poll(500)) { @@ -175,11 +181,42 @@ public class KafkaGetQueryResultStreamIT { final UUID queryId = UUID.randomUUID(); // Use the interactor that is being tested to create a result stream and immediately close it. - final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); - final QueryResultStream results = interactor.fromStart(queryId); + final GetQueryResultStream<VisibilityBindingSet> interactor = + new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class); + final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId); results.close(); // Try to poll the closed stream. results.poll(1); } + + @Test + public void fromStart_visibilityStatements() throws Exception { + // Create an ID for the query. + final UUID queryId = UUID.randomUUID(); + + // Create some statements that will be written to the result topic. + final List<VisibilityStatement> original = new ArrayList<>(); + final ValueFactory vf = new ValueFactoryImpl(); + original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(63)), "b") ); + original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral("urn:34")), "") ); + + // Write the entries to the query result topic in Kafka. + try(final Producer<?, VisibilityStatement> producer = + KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { + final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + for(final VisibilityStatement visStmt : original) { + producer.send(new ProducerRecord<>(resultTopic, visStmt)); + } + } + + // Use the interactor that is being tested to read all of the visibility binding sets. + final GetQueryResultStream<VisibilityStatement> interactor = + new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class); + final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId)); + + // Show the fetched binding sets match the original, as well as their order. + assertEquals(original, read); + } } \ No newline at end of file
