RYA-466 Update the Rya Streams Client to stream results to file.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e4536943 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e4536943 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e4536943 Branch: refs/heads/master Commit: e453694334244df2dc3c8768be77e173a734135a Parents: 8b11d74 Author: kchilton2 <[email protected]> Authored: Wed Feb 14 13:49:52 2018 -0500 Committer: Valiyil <[email protected]> Committed: Fri Mar 9 13:00:00 2018 -0500 ---------------------------------------------------------------------- .../apache/rya/api/utils/QueryInvestigator.java | 103 +++++++++++++ .../rya/api/utils/QueryInvestigatorTest.java | 150 +++++++++++++++++++ extras/rya.streams/client/pom.xml | 9 ++ .../client/command/StreamResultsCommand.java | 58 +++++-- .../client/util/QueryResultsOutputUtil.java | 133 ++++++++++++++++ .../client/util/QueryResultsOutputUtilTest.java | 135 +++++++++++++++++ 6 files changed, 577 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java new file mode 100644 index 0000000..2fbd09b --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import static java.util.Objects.requireNonNull; + +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.ParsedGraphQuery; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.QueryParserUtil; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A utility class that is used to glean insight into the structure of SPARQL queries. + */ +@DefaultAnnotation(NonNull.class) +public class QueryInvestigator { + + private static final SPARQLParser PARSER = new SPARQLParser(); + + private QueryInvestigator() { } + + /** + * Determines whether a SPARQL command is a CONSTRUCT or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ + public static boolean isConstruct(final String sparql) throws MalformedQueryException { + requireNonNull(sparql); + + try { + // Constructs are queries, so try to create a ParsedQuery. + final ParsedQuery parsedQuery = PARSER.parseQuery(sparql, null); + + // Check to see if the SPARQL looks like a CONSTRUCT query. + return parsedQuery instanceof ParsedGraphQuery; + + } catch(final MalformedQueryException queryE) { + try { + // Maybe it's an update. + PARSER.parseUpdate(sparql, null); + + // It was, so return false. + return false; + + } catch(final MalformedQueryException updateE) { + // It's not. Actually malformed. + throw queryE; + } + } + } + + /** + * Determines whether a SPARQL command is an INSERT with a WHERE clause or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is an INSERT update; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ + public static boolean isInsertWhere(final String sparql) throws MalformedQueryException { + requireNonNull(sparql); + + try { + // Inserts are updated, so try to create a ParsedUpdate. + PARSER.parseUpdate(sparql, null); + final String strippedOperation = QueryParserUtil.removeSPARQLQueryProlog(sparql.toLowerCase()); + return strippedOperation.startsWith("insert"); + } catch(final MalformedQueryException updateE) { + try { + // Maybe it's a query. + PARSER.parseQuery(sparql, null); + + // It was, so return false. + return false; + + } catch(final MalformedQueryException queryE) { + // It's not. Actually malformed. + throw updateE; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java new file mode 100644 index 0000000..fbe2ebe --- /dev/null +++ b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; + +/** + * Unit tests the methods of {@link QueryInvestigator}. + */ +public class QueryInvestigatorTest { + + @Test + public void isConstruct_true() throws Exception { + final String sparql = + "PREFIX vCard: <http://www.w3.org/2001/vcard-rdf/3.0#> " + + "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " + + "CONSTRUCT { " + + "?X vCard:FN ?name . " + + "?X vCard:URL ?url . " + + "?X vCard:TITLE ?title . " + + "} " + + "FROM <http://www.w3.org/People/Berners-Lee/card> " + + "WHERE { " + + "OPTIONAL { ?X foaf:name ?name . FILTER isLiteral(?name) . } " + + "OPTIONAL { ?X foaf:homepage ?url . FILTER isURI(?url) . } " + + "OPTIONAL { ?X foaf:title ?title . FILTER isLiteral(?title) . } " + + "}"; + + assertTrue( QueryInvestigator.isConstruct(sparql) ); + } + + @Test + public void isConstruct_false_notAConstruct() throws Exception { + final String sparql = "SELECT * WHERE { ?a ?b ?c . }"; + assertFalse( QueryInvestigator.isConstruct(sparql) ); + } + + @Test + public void isConstruct_false_notAConstructWithKeywords() throws Exception { + final String sparql = + "SELECT ?construct " + + "WHERE {" + + " ?construct <urn:built> <urn:skyscraper> ." + + "}"; + assertFalse( QueryInvestigator.isConstruct(sparql) ); + } + + @Test + public void isConstruct_false_notAQuery() throws Exception { + final String sparql = + "PREFIX Sensor: <http://example.com/Equipment.owl#> " + + "INSERT { " + + "?subject Sensor:test2 ?newValue " + + "} WHERE {" + + "values (?oldValue ?newValue) {" + + "('testValue1' 'newValue1')" + + "('testValue2' 'newValue2')" + + "}" + + "?subject Sensor:test1 ?oldValue" + + "}"; + + assertFalse( QueryInvestigator.isConstruct(sparql) ); + } + + @Test(expected = MalformedQueryException.class) + public void isConstruct_false_malformed() throws MalformedQueryException { + assertFalse( QueryInvestigator.isConstruct("not sparql") ); + } + + @Test + public void isInsert_true() throws Exception { + final String sparql = + "PREFIX Sensor: <http://example.com/Equipment.owl#> " + + "INSERT { " + + "?subject Sensor:test2 ?newValue " + + "} WHERE {" + + "values (?oldValue ?newValue) {" + + "('testValue1' 'newValue1')" + + "('testValue2' 'newValue2')" + + "}" + + "?subject Sensor:test1 ?oldValue" + + "}"; + + assertTrue( QueryInvestigator.isInsertWhere(sparql) ); + } + + @Test + public void isInsert_false_notAnInsert() throws Exception { + final String sparql = + "PREFIX dc: <http://purl.org/dc/elements/1.1/> " + + "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> " + + "DELETE { " + + "?book ?p ?v " + + "} WHERE { " + + "?book dc:date ?date . " + + "FILTER ( ?date < \"2000-01-01T00:00:00\"^^xsd:dateTime ) " + + "?book ?p ?v " + + "}"; + + assertFalse( QueryInvestigator.isInsertWhere(sparql) ); + } + + @Test + public void isInsert_false_notAnInsertWithKeywords() throws Exception { + final String sparql = + "DELETE" + + "{ " + + " ?bookInsert ?p ?o" + + "}" + + "WHERE" + + "{ " + + " ?bookInsert <urn:datePrinted> ?datePrinted ." + + " FILTER ( ?datePrinted < \"2018-01-01T00:00:00\"^^xsd:dateTime )" + + " ?bookInsert ?p ?o" + + "}"; + + assertFalse( QueryInvestigator.isInsertWhere(sparql) ); + } + + @Test + public void isInsert_false_notAnUpdate() throws Exception { + final String sparql = "SELECT * WHERE { ?a ?b ?c . }"; + assertFalse( QueryInvestigator.isInsertWhere(sparql) ); + } + + @Test(expected = MalformedQueryException.class) + public void isInsert_false_malformed() throws MalformedQueryException { + assertFalse( QueryInvestigator.isInsertWhere("not sparql") ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/extras/rya.streams/client/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml index 5bd7933..38c3c86 100644 --- a/extras/rya.streams/client/pom.xml +++ b/extras/rya.streams/client/pom.xml @@ -69,6 +69,10 @@ under the License. <groupId>org.openrdf.sesame</groupId> <artifactId>sesame-rio-turtle</artifactId> </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-sparqljson</artifactId> + </dependency> <!-- Third Party dependencies --> <dependency> @@ -92,6 +96,11 @@ under the License. <scope>test</scope> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.test.kafka</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java index 3612dd0..8ae4e08 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,11 +20,18 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.api.utils.QueryInvestigator; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.interactor.GetQueryResultStream; @@ -32,13 +39,13 @@ import org.apache.rya.streams.api.queries.InMemoryQueryRepository; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.client.util.QueryResultsOutputUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.Reduced; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.parser.sparql.SPARQLParser; @@ -65,14 +72,20 @@ public class StreamResultsCommand implements RyaStreamsCommand { @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.") private String queryId; + @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.") + private String outputPath; + @Override public String toString() { final StringBuilder parameters = new StringBuilder(); parameters.append(super.toString()); if (!Strings.isNullOrEmpty(queryId)) { - parameters.append("\tQuery ID: " + queryId); - parameters.append("\n"); + parameters.append("\tQuery ID: " + queryId + "\n"); + } + + if(!Strings.isNullOrEmpty(outputPath)) { + parameters.append("\tOutput Path: " + outputPath + "\n"); } return parameters.toString(); @@ -160,10 +173,12 @@ public class StreamResultsCommand implements RyaStreamsCommand { }); // Build the interactor based on the type of result the query produces. + boolean isStatementResults = false; + final GetQueryResultStream<?> getQueryResultStream; try { - final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); - if(tupleExpr instanceof Reduced) { + isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql); + if(isStatementResults) { getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class); } else { getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class); @@ -172,12 +187,22 @@ public class StreamResultsCommand implements RyaStreamsCommand { throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e); } - // Iterate through the results and print them to the console until the program or the stream ends. - try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) { - while(!finished.get()) { - for(final Object result : stream.poll(1000)) { - System.out.println(result); + // Iterate through the results and print them to the configured output mechanism. + try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) { + final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + if(params.outputPath != null) { + final Path file = Paths.get(params.outputPath); + try (final OutputStream out = Files.newOutputStream(file)) { + if(isStatementResults) { + final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream; + QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished); + } else { + final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream; + QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished); + } } + } else { + streamToSystemOut(resultsStream, finished); } } catch (final Exception e) { System.err.println("Error while reading the results from the stream."); @@ -185,4 +210,15 @@ public class StreamResultsCommand implements RyaStreamsCommand { System.exit(1); } } + + private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception { + requireNonNull(stream); + requireNonNull(shutdownSignal); + + while(!shutdownSignal.get()) { + for(final Object result : stream.poll(1000)) { + System.out.println(result); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java new file mode 100644 index 0000000..114a6fe --- /dev/null +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.util; + +import static java.util.Objects.requireNonNull; + +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.resultio.sparqljson.SPARQLResultsJSONWriter; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFWriter; +import org.openrdf.rio.Rio; +import org.openrdf.rio.WriterConfig; +import org.openrdf.rio.helpers.BasicWriterSettings; + +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A utility that writes {@link QueryResultStream} results to an {@link OutputStream}. + */ +@DefaultAnnotation(NonNull.class) +public class QueryResultsOutputUtil { + + /** + * Private constructor to prevent instantiation. + */ + private QueryResultsOutputUtil() { } + + /** + * Writes the results of a {@link QueryResultStream} to the output stream as NTriples until the + * shutdown signal is set. + * + * @param out - The stream the NTriples data will be written to. (not null) + * @param resultsStream - The results stream that will be polled for results to + * write to {@code out}. (not null) + * @param shutdownSignal - Setting this signal will cause the thread that + * is processing this function to finish and leave. (not null) + * @throws RDFHandlerException A problem was encountered while + * writing the NTriples to the output stream. + * @throws IllegalStateException The {@code resultsStream} is closed. + * @throws RyaStreamsException Could not fetch the next set of results. + */ + public static void toNtriplesFile( + final OutputStream out, + final QueryResultStream<VisibilityStatement> resultsStream, + final AtomicBoolean shutdownSignal) throws RDFHandlerException, IllegalStateException, RyaStreamsException { + requireNonNull(out); + requireNonNull(resultsStream); + requireNonNull(shutdownSignal); + + final RDFWriter writer = Rio.createWriter(RDFFormat.NTRIPLES, out); + writer.startRDF(); + + while(!shutdownSignal.get()) { + final Iterable<VisibilityStatement> it = resultsStream.poll(1000); + for(final VisibilityStatement result : it) { + writer.handleStatement(result); + } + } + + writer.endRDF(); + } + + /** + * Writes the results of a {@link QueryResultStream} to the output stream as JSON until the + * shutdown signal is set. + * + * @param out - The stream the JSON will be written to. (not null) + * @param query - The parsed SPARQL Query whose results are being output. This + * object is used to figure out which bindings may appear. (not null) + * @param resultsStream - The results stream that will be polled for results to + * write to {@code out}. (not null) + * @param shutdownSignal - Setting this signal will cause the thread that + * is processing this function to finish and leave. (not null) + * @throws TupleQueryResultHandlerException A problem was encountered while + * writing the JSON to the output stream. + * @throws IllegalStateException The {@code resultsStream} is closed. + * @throws RyaStreamsException Could not fetch the next set of results. + */ + public static void toBindingSetJSONFile( + final OutputStream out, + final TupleExpr query, + final QueryResultStream<VisibilityBindingSet> resultsStream, + final AtomicBoolean shutdownSignal) throws TupleQueryResultHandlerException, IllegalStateException, RyaStreamsException { + requireNonNull(out); + requireNonNull(query); + requireNonNull(resultsStream); + requireNonNull(shutdownSignal); + + // Create a writer that does not pretty print. + final SPARQLResultsJSONWriter writer = new SPARQLResultsJSONWriter(out); + final WriterConfig config = writer.getWriterConfig(); + config.set(BasicWriterSettings.PRETTY_PRINT, false); + + // Start the JSON and enumerate the possible binding names. + writer.startQueryResult( Lists.newArrayList(query.getBindingNames()) ); + + while(!shutdownSignal.get()) { + for(final VisibilityBindingSet result : resultsStream.poll(1000)) { + writer.handleSolution(result); + } + } + + writer.endQueryResult(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4536943/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java new file mode 100644 index 0000000..b82e671 --- /dev/null +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Charsets; + +/** + * Unit tests the methods of {@link QueryResultsOutputUtil}. + */ +public class QueryResultsOutputUtilTest { + + private static final ValueFactory VF = new ValueFactoryImpl(); + + @Test + public void toNtriplesFile() throws Exception { + // Mock a result stream that signals shutdown when it returns a set of results. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + final QueryResultStream<VisibilityStatement> resultsStream = mock(QueryResultStream.class); + when(resultsStream.poll(anyLong())).thenAnswer(invocation -> { + shutdownSignal.set(true); + + final List<VisibilityStatement> results = new ArrayList<>(); + + Statement stmt = VF.createStatement(VF.createURI("urn:alice"), VF.createURI("urn:age"), VF.createLiteral(23)); + results.add( new VisibilityStatement(stmt) ); + + stmt = VF.createStatement(VF.createURI("urn:bob"), VF.createURI("urn:worksAt"), VF.createLiteral("Taco Shop")); + results.add( new VisibilityStatement(stmt) ); + return results; + }); + + // The stream the JSON will be written to. + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + // Invoke the test method. This will write the NTriples. + QueryResultsOutputUtil.toNtriplesFile(out, resultsStream, shutdownSignal); + + // Show the produced NTriples matches the expected NTriples. + final String expected = + "<urn:alice> <urn:age> \"23\"^^<http://www.w3.org/2001/XMLSchema#int> .\n" + + "<urn:bob> <urn:worksAt> \"Taco Shop\" .\n"; + + final String nTriples = new String(out.toByteArray(), Charsets.UTF_8); + assertEquals(expected, nTriples); + } + + @Test + public void toBindingSetJSONFile() throws Exception { + // A SPARQL query that uses OPTIONAL values. + final String sparql = + "SELECT * WHERE { " + + "?name <urn:worksAt> ?company . " + + "OPTIONAL{ ?name <urn:ssn> ?ssn} " + + "}"; + final TupleExpr query = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + + // Mock a results stream that signals shutdown when it returns a set of results. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + final QueryResultStream<VisibilityBindingSet> resultsStream = mock(QueryResultStream.class); + when(resultsStream.poll(anyLong())).thenAnswer(invocation -> { + shutdownSignal.set(true); + + final List<VisibilityBindingSet> results = new ArrayList<>(); + + // A result with the optional value. + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", VF.createLiteral("alice")); + bs.addBinding("company", VF.createLiteral("Taco Shop")); + bs.addBinding("ssn", VF.createURI("urn:111-11-1111")); + results.add(new VisibilityBindingSet(bs, "")); + + + // A result without the optional value. + bs = new MapBindingSet(); + bs.addBinding("name", VF.createLiteral("bob")); + bs.addBinding("company", VF.createLiteral("Cafe")); + results.add(new VisibilityBindingSet(bs, "")); + + return results; + }); + + // The stream the JSON will be written to. + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + // Invoke the test method. This will write the json. + QueryResultsOutputUtil.toBindingSetJSONFile(out, query, resultsStream, shutdownSignal); + + // Show the produced JSON matches the expected JSON. + final String expected = "{\"head\":{\"vars\":[\"name\",\"company\",\"ssn\"]},\"results\":{" + + "\"bindings\":[{\"name\":{\"type\":\"literal\",\"value\":\"alice\"},\"company\":{" + + "\"type\":\"literal\",\"value\":\"Taco Shop\"},\"ssn\":{\"type\":\"uri\",\"value\":" + + "\"urn:111-11-1111\"}},{\"name\":{\"type\":\"literal\",\"value\":\"bob\"},\"company\"" + + ":{\"type\":\"literal\",\"value\":\"Cafe\"}}]}}"; + final String json = new String(out.toByteArray(), Charsets.UTF_8); + System.out.println(json); + assertEquals(expected, json); + } +} \ No newline at end of file
