http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java new file mode 100644 index 0000000..ba11e57 --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.sp; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +/** + * Integration tests the methods of {@link StatementPatternProcessor}. + */ +public class StatementPatternProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void singlePattern_singleStatement() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create a statement that generate an SP result. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void singlePattern_manyStatements() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("otherPerson", vf.createURI("urn:Alice")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multiplePatterns_singleStatement() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "?person ?action <urn:Bob>" + + "}"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multiplePatterns_multipleStatements() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson ." + + "?person ?action <urn:Bob>" + + "}"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + QueryBindingSet bs = new QueryBindingSet(); + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + bs.addBinding("otherPerson", vf.createURI("urn:Charlie")); + expected.add(new VisibilityBindingSet(bs, "a&(a|b)")); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add(new VisibilityBindingSet(bs, "a")); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka-test/pom.xml b/extras/rya.streams/kafka-test/pom.xml new file mode 100644 index 0000000..4a423e2 --- /dev/null +++ b/extras/rya.streams/kafka-test/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.streams.kafka-test</artifactId> + + <name>Apache Rya Streams Kafka Test</name> + <description> + A common test jar containing utilities used to run Kafka based Rya + Streams integration tests. + </description> + + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java new file mode 100644 index 0000000..ee25f8c --- /dev/null +++ b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; + +import com.google.common.collect.Sets; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Utility functions that make it easier to test Rya Streams applications. + */ +@DefaultAnnotation(NonNull.class) +public class RyaStreamsTestUtil { + + /** + * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of + * the results topic, and ensures the expected results match the read results. + * + * @param <T> The type of value that will be consumed from the results topic. + * @param kafka - The embedded Kafka instance that is being tested with. (not null) + * @param statementsTopic - The topic statements will be written to. (not null) + * @param resultsTopic - The topic results will be read from. (not null) + * @param builder - The streams topology that will be executed. (not null) + * @param statements - The statements that will be loaded into the topic. (not null) + * @param expected - The expected results. (not null) + * @param expectedDeserializerClass - The class of the deserializer that will be used when reading + * values from the results topic. (not null) + * @throws Exception If any exception was thrown while running the test. + */ + public static <T> void runStreamProcessingTest( + final KafkaTestInstanceRule kafka, + final String statementsTopic, + final String resultsTopic, + final TopologyBuilder builder, + final List<VisibilityStatement> statements, + final Set<T> expected, + final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception { + requireNonNull(kafka); + requireNonNull(statementsTopic); + requireNonNull(resultsTopic); + requireNonNull(builder); + requireNonNull(statements); + requireNonNull(expected); + requireNonNull(expectedDeserializerClass); + + // Explicitly create the topics that are being used. + kafka.createTopic(statementsTopic); + kafka.createTopic(resultsTopic); + + // Start the streams program. + final Properties props = kafka.createBootstrapServerConfig(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); + + final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); + streams.cleanUp(); + try { + streams.start(); + + // Wait for the streams application to start. Streams only see data after their consumers are connected. + Thread.sleep(6000); + + // Load the statements into the input topic. + try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { + new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); + } + + // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. + try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) { + // Register the topic. + consumer.subscribe(Arrays.asList(resultsTopic)); + + // Poll for the result. + final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); + + // Show the correct binding sets results from the job. + assertEquals(expected, results); + } + } finally { + streams.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 8926870..778630d 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -36,23 +36,22 @@ under the License. <profiles> <profile> <id>geoindexing</id> - <dependencies> - <!-- Rya dependencies --> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.functions.geo</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.geo.common</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> + <dependencies> + <!-- Rya dependencies --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.functions.geo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.geo.common</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> </profile> </profiles> - <dependencies> <!-- Rya dependencies --> <dependency> @@ -106,6 +105,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> + <artifactId>rya.test.rdf</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> <artifactId>rya.test.kafka</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java deleted file mode 100644 index b4388c3..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka; - -import static java.util.Objects.requireNonNull; - -import java.util.concurrent.atomic.AtomicReference; - -import org.openrdf.query.algebra.Filter; -import org.openrdf.query.algebra.MultiProjection; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; - -/** - * A set of utility functions that are useful when writing tests RDF functions. - */ -@DefaultAnnotation(NonNull.class) -public final class RdfTestUtil { - - private RdfTestUtil() { } - - /** - * Fetch the {@link StatementPattern} from a SPARQL string. - * - * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul) - * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null} - * @throws Exception The statement pattern could not be found in the parsed SPARQL query. - */ - public static @Nullable StatementPattern getSp(final String sparql) throws Exception { - requireNonNull(sparql); - - final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>(); - final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); - parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() { - @Override - public void meet(final StatementPattern node) throws Exception { - statementPattern.set(node); - } - }); - return statementPattern.get(); - } - - /** - * Get the first {@link Projection} node from a SPARQL query. - * - * @param sparql - The query that contains a single Projection node. - * @return The first {@link Projection} that is encountered. - * @throws Exception The query could not be parsed. - */ - public static @Nullable Projection getProjection(final String sparql) throws Exception { - requireNonNull(sparql); - - final AtomicReference<Projection> projection = new AtomicReference<>(); - final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); - parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { - @Override - public void meet(final Projection node) throws Exception { - projection.set(node); - } - }); - - return projection.get(); - } - - /** - * Get the first {@link MultiProjection} node from a SPARQL query. - * - * @param sparql - The query that contains a single Projection node. - * @return The first {@link MultiProjection} that is encountered. - * @throws Exception The query could not be parsed. - */ - public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception { - requireNonNull(sparql); - - final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>(); - final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); - parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { - @Override - public void meet(final MultiProjection node) throws Exception { - multiProjection.set(node); - } - }); - - return multiProjection.get(); - } - - /** - * Get the first {@link Filter} node from a SPARQL query. - * - * @param sparql - The query that contains a single Projection node. - * @return The first {@link Filter} that is encountered. - * @throws Exception The query could not be parsed. - */ - public static @Nullable Filter getFilter(final String sparql) throws Exception { - requireNonNull(sparql); - - final AtomicReference<Filter> filter = new AtomicReference<>(); - final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); - parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { - @Override - public void meet(final Filter node) throws Exception { - filter.set(node); - } - }); - - return filter.get(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java deleted file mode 100644 index ee25f8c..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka; - -import static java.util.Objects.requireNonNull; -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.apache.rya.test.kafka.KafkaTestUtil; - -import com.google.common.collect.Sets; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Utility functions that make it easier to test Rya Streams applications. - */ -@DefaultAnnotation(NonNull.class) -public class RyaStreamsTestUtil { - - /** - * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of - * the results topic, and ensures the expected results match the read results. - * - * @param <T> The type of value that will be consumed from the results topic. - * @param kafka - The embedded Kafka instance that is being tested with. (not null) - * @param statementsTopic - The topic statements will be written to. (not null) - * @param resultsTopic - The topic results will be read from. (not null) - * @param builder - The streams topology that will be executed. (not null) - * @param statements - The statements that will be loaded into the topic. (not null) - * @param expected - The expected results. (not null) - * @param expectedDeserializerClass - The class of the deserializer that will be used when reading - * values from the results topic. (not null) - * @throws Exception If any exception was thrown while running the test. - */ - public static <T> void runStreamProcessingTest( - final KafkaTestInstanceRule kafka, - final String statementsTopic, - final String resultsTopic, - final TopologyBuilder builder, - final List<VisibilityStatement> statements, - final Set<T> expected, - final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception { - requireNonNull(kafka); - requireNonNull(statementsTopic); - requireNonNull(resultsTopic); - requireNonNull(builder); - requireNonNull(statements); - requireNonNull(expected); - requireNonNull(expectedDeserializerClass); - - // Explicitly create the topics that are being used. - kafka.createTopic(statementsTopic); - kafka.createTopic(resultsTopic); - - // Start the streams program. - final Properties props = kafka.createBootstrapServerConfig(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); - - final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); - streams.cleanUp(); - try { - streams.start(); - - // Wait for the streams application to start. Streams only see data after their consumers are connected. - Thread.sleep(6000); - - // Load the statements into the input topic. - try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( - kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { - new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); - } - - // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. - try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) { - // Register the topic. - consumer.subscribe(Arrays.asList(resultsTopic)); - - // Poll for the result. - final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); - - // Show the correct binding sets results from the job. - assertEquals(expected, results); - } - } finally { - streams.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java deleted file mode 100644 index 33dc945..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -/** - * Integration tests the methods of {@link StatementPatternProcessor}. - */ -public class StatementPatternProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void singlePattern_singleStatement() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create a statement that generate an SP result. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - - // Show the correct binding set results from the job. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void singlePattern_manyStatements() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements where some generates SP results and others do not. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") ); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") ); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") ); - - // Show the correct binding set results from the job. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - - QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Bob")); - bs.addBinding("otherPerson", vf.createURI("urn:Alice")); - expected.add( new VisibilityBindingSet(bs, "a|b") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void multiplePatterns_singleStatement() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = "SELECT * WHERE { " - + "?person <urn:talksTo> ?otherPerson . " - + "?person ?action <urn:Bob>" - + "}"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements where some generates SP results and others do not. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - - // Show the correct binding set results from the job. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("action", vf.createURI("urn:talksTo")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void multiplePatterns_multipleStatements() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = "SELECT * WHERE { " - + "?person <urn:talksTo> ?otherPerson ." - + "?person ?action <urn:Bob>" - + "}"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements where some generates SP results and others do not. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") ); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") ); - - // Show the correct binding set results from the job. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - - QueryBindingSet bs = new QueryBindingSet(); - bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("action", vf.createURI("urn:talksTo")); - bs.addBinding("otherPerson", vf.createURI("urn:Charlie")); - expected.add(new VisibilityBindingSet(bs, "a&(a|b)")); - - bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("action", vf.createURI("urn:talksTo")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add(new VisibilityBindingSet(bs, "a")); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java deleted file mode 100644 index 072469a..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.aggregation; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.impl.MapBindingSet; - -/** - * Integration tests {@link AggregationProcessor}. - */ -public class AggregationProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); - - @Test - public void count() throws Exception { - // A query that figures out how many books each person has. - final String sparql = - "SELECT ?person (count(?book) as ?bookCount) " + - "WHERE { " + - "?person <urn:hasBook> ?book " + - "} GROUP BY ?person"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "a")); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Bob")); - bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "a&b")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void sum() throws Exception { - // A query that figures out how much food each person has. - final String sparql = - "SELECT ?person (sum(?foodCount) as ?totalFood) " + - "WHERE { " + - "?person <urn:hasFoodType> ?food . " + - "?food <urn:count> ?foodCount . " + - "} GROUP BY ?person"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void average() throws Exception { - // A query that figures out the average age across all people. - final String sparql = - "SELECT (avg(?age) as ?avgAge) " + - "WHERE { " + - "?person <urn:age> ?age " + - "}"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void min() throws Exception { - // A query that figures out what the youngest age is across all people. - final String sparql = - "SELECT (min(?age) as ?youngest) " + - "WHERE { " + - "?person <urn:age> ?age " + - "}"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(13)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(7)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(5)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void max() throws Exception { - // A query that figures out what the oldest age is across all people. - final String sparql = - "SELECT (max(?age) as ?oldest) " + - "WHERE { " + - "?person <urn:age> ?age " + - "}"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("oldest", vf.createLiteral(13)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("oldest", vf.createLiteral(14)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("oldest", vf.createLiteral(25)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void multipleGroupByVars() throws Exception { - // A query that contains more than one group by variable. - final String sparql = - "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " + - "WHERE {" + - "?employee <urn:worksAt> ?business . " + - "?business <urn:hasTimecardId> ?timecardId . " + - "?employee <urn:hasTimecardId> ?timecardId . " + - "?timecardId <urn:hours> ?hours . " + - "} GROUP BY ?business ?employee"; - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), "")); - - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), "")); - - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); - - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); - - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("business", vf.createURI("urn:TacoJoint")); - bs.addBinding("employee", vf.createURI("urn:Alice")); - bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("business", vf.createURI("urn:TacoJoint")); - bs.addBinding("employee", vf.createURI("urn:Alice")); - bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("business", vf.createURI("urn:TacoJoint")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("business", vf.createURI("urn:TacoJoint")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("business", vf.createURI("urn:CoffeeShop")); - bs.addBinding("employee", vf.createURI("urn:Alice")); - bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void multipleAggregations() throws Exception { - // A query that figures out what the youngest and oldest ages are across all people. - final String sparql = - "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " + - "WHERE { " + - "?person <urn:age> ?age " + - "}"; - - // Create the statements that will be input into the query.. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); - statements.add(new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(13)); - bs.addBinding("oldest", vf.createLiteral(13)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(13)); - bs.addBinding("oldest", vf.createLiteral(14)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(7)); - bs.addBinding("oldest", vf.createLiteral(14)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(5)); - bs.addBinding("oldest", vf.createLiteral(14)); - expected.add(new VisibilityBindingSet(bs, "")); - - bs = new MapBindingSet(); - bs.addBinding("youngest", vf.createLiteral(5)); - bs.addBinding("oldest", vf.createLiteral(25)); - expected.add(new VisibilityBindingSet(bs, "")); - - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java deleted file mode 100644 index aaa67ea..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.filter; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.impl.MapBindingSet; - -/** - * Integration tests the methods of {@link FilterProcessor}. - */ -public class FilterProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void showProcessorWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "SELECT * " + - "WHERE { " + - "FILTER(?age < 10)" + - "?person <urn:age> ?age " + - "}"; - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a")); - statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a")); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("age", vf.createLiteral(9)); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java deleted file mode 100644 index 3ff8e8d..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.filter; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.rya.api.function.filter.FilterEvaluator; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; -import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.Filter; -import org.openrdf.query.impl.MapBindingSet; - -/** - * Unit tests the methods of {@link FilterProcessor}. - */ -public class FilterProcessorTest { - - @Test - public void showFilterFunctionIsCalled() throws Exception { - // Read the filter object from a SPARQL query. - final Filter filter = RdfTestUtil.getFilter( - "SELECT * " + - "WHERE { " + - "FILTER(?age < 10)" + - "?person <urn:age> ?age " + - "}"); - - // Create a Binding Set that will be passed into the Filter function based on the where clause. - final ValueFactory vf = new ValueFactoryImpl(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("age", vf.createLiteral(9)); - final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a"); - - // Mock the processor context that will be invoked. - final ProcessorContext context = mock(ProcessorContext.class); - - // Run the test. - final FilterProcessor processor = new FilterProcessor( - FilterEvaluator.make(filter), - result -> ProcessorResult.make(new UnaryResult(result))); - processor.init(context); - processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs))); - - // Verify the binding set was passed through. - verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs)))); - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java deleted file mode 100644 index c090afa..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.filter; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.indexing.GeoConstants; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.StatementImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.evaluation.function.Function; -import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; -import org.openrdf.query.impl.MapBindingSet; - -import com.vividsolutions.jts.geom.Coordinate; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.geom.GeometryFactory; -import com.vividsolutions.jts.io.WKTWriter; - -/** - * Integration tests the geo methods of {@link FilterProcessor}. - */ -public class GeoFilterIT { - private static final String GEO = "http://www.opengis.net/def/function/geosparql/"; - private static final GeometryFactory GF = new GeometryFactory(); - private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0)); - private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1)); - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void showGeoFunctionsRegistered() { - int count = 0; - final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); - for (final Function fun : funcs) { - if (fun.getURI().startsWith(GEO)) { - count++; - } - } - - // There are 30 geo functions registered, ensure that there are 30. - assertEquals(30, count); - } - - @Test - public void showProcessorWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n" - + "PREFIX geof: <" + GEO + ">\n" - + "SELECT * \n" - + "WHERE { \n" - + " <urn:event1> geo:asWKT ?point .\n" - + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) " - + "}"; - - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = getStatements(); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - final WKTWriter w = new WKTWriter(); - bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT)); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - private List<VisibilityStatement> getStatements() throws Exception { - final List<VisibilityStatement> statements = new ArrayList<>(); - // geo 2x2 points - statements.add(new VisibilityStatement(statement(ZERO), "a")); - statements.add(new VisibilityStatement(statement(ONE), "a")); - return statements; - } - - private static Statement statement(final Geometry geo) { - final ValueFactory vf = new ValueFactoryImpl(); - final Resource subject = vf.createURI("urn:event1"); - final URI predicate = GeoConstants.GEO_AS_WKT; - final WKTWriter w = new WKTWriter(); - final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT); - return new StatementImpl(subject, predicate, object); - } -} \ No newline at end of file
