http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java deleted file mode 100644 index 22a883b..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.filter; - -import static org.junit.Assert.assertEquals; - -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.StatementImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.evaluation.function.Function; -import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; -import org.openrdf.query.impl.MapBindingSet; - -/** - * Integration tests the temporal methods of {@link FilterProcessor}. - */ -public class TemporalFilterIT { - private static final ValueFactory vf = new ValueFactoryImpl(); - private static final String TEMPORAL = "http://rya.apache.org/ns/temporal"; - private static final ZonedDateTime TIME = ZonedDateTime.parse("2015-12-30T12:00:00Z"); - private static final ZonedDateTime TIME_10 = ZonedDateTime.parse("2015-12-30T12:00:10Z"); - private static final ZonedDateTime TIME_20 = ZonedDateTime.parse("2015-12-30T12:00:20Z"); - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); - - @Test - public void temporalFunctionsRegistered() { - int count = 0; - final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); - for (final Function fun : funcs) { - if (fun.getURI().startsWith(TEMPORAL)) { - count++; - } - } - - // There are 4 temporal functions registered, ensure that there are 4. - assertEquals(4, count); - } - - @Test - public void showEqualsWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "PREFIX time: <http://www.w3.org/2006/time/> \n" - + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" - + "SELECT * \n" - + "WHERE { \n" - + " <urn:time> time:atTime ?date .\n" - + " FILTER(tempf:equals(?date, \"" + TIME.toString() + "\")) " - + "}"; - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = getStatements(); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("date", vf.createLiteral(TIME.toString())); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void showBeforeWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "PREFIX time: <http://www.w3.org/2006/time/> \n" - + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" - + "SELECT * \n" - + "WHERE { \n" - + " <urn:time> time:atTime ?date .\n" - + " FILTER(tempf:before(?date, \"" + TIME_10.toString() + "\")) " - + "}"; - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = getStatements(); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("date", vf.createLiteral(TIME.toString())); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void showAfterWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "PREFIX time: <http://www.w3.org/2006/time/> \n" - + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" - + "SELECT * \n" - + "WHERE { \n" - + " <urn:time> time:atTime ?date .\n" - + " FILTER(tempf:after(?date, \"" + TIME_10.toString() + "\")) " - + "}"; - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = getStatements(); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("date", vf.createLiteral(TIME_20.toString())); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void showWithinWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the RDF model objects that will be used to build the query. - final String sparql = - "PREFIX time: <http://www.w3.org/2006/time/> \n" - + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" - + "SELECT * \n" - + "WHERE { \n" - + " <urn:time> time:atTime ?date .\n" - + " FILTER(tempf:within(?date, \"" + TIME.toString() + "/" + TIME_20.toString() + "\")) " - + "}"; - // Setup a topology. - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = getStatements(); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("date", vf.createLiteral(TIME_10.toString())); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - private List<VisibilityStatement> getStatements() throws Exception { - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add(new VisibilityStatement(statement(TIME), "a")); - statements.add(new VisibilityStatement(statement(TIME_10), "a")); - statements.add(new VisibilityStatement(statement(TIME_20), "a")); - return statements; - } - - private static Statement statement(final ZonedDateTime time) { - final Resource subject = vf.createURI("urn:time"); - final URI predicate = vf.createURI("http://www.w3.org/2006/time/atTime"); - final Value object = vf.createLiteral(time.toString()); - return new StatementImpl(subject, predicate, object); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java deleted file mode 100644 index bdb9be6..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.join; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.join.NaturalJoin; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; -import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.impl.MapBindingSet; - -import com.google.common.collect.Lists; - -/** - * Integration tests the methods of {@link JoinProcessor}. - */ -public class JoinProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test(expected = IllegalArgumentException.class) - public void badAllVars() throws IllegalArgumentException { - new JoinProcessorSupplier( - "NATURAL_JOIN", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("person", "employee", "business"), - result -> ProcessorResult.make( new UnaryResult(result) )); - } - - @Test - public void newLeftResult() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = - "SELECT * WHERE { " + - "?person <urn:talksTo> ?employee ." + - "?employee <urn:worksAt> ?business" + - " }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements that generate a bunch of right SP results. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); - - // Add a statement that will generate a left result that joins with some of those right results. - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:TacoPlace")); - expected.add( new VisibilityBindingSet(bs, "a&b&c") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:BurgerJoint")); - expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void newRightResult() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = - "SELECT * WHERE { " + - "?person <urn:talksTo> ?employee ." + - "?employee <urn:worksAt> ?business" + - " }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements that generate a bunch of right SP results. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); - - // Add a statement that will generate a left result that joins with some of those right results. - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:TacoPlace")); - expected.add( new VisibilityBindingSet(bs, "a&b&c") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:BurgerJoint")); - expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void newResultsBothSides() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = - "SELECT * WHERE { " + - "?person <urn:talksTo> ?employee ." + - "?employee <urn:worksAt> ?business" + - " }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements that generate a bunch of right SP results. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:TacoPlace")); - expected.add( new VisibilityBindingSet(bs, "a&b&c") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:BurgerJoint")); - expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Bob")); - bs.addBinding("employee", vf.createURI("urn:Charlie")); - bs.addBinding("business", vf.createURI("urn:BurgerJoint")); - expected.add( new VisibilityBindingSet(bs, "a&c") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void manyJoins() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = - "SELECT * WHERE { " + - "?person <urn:talksTo> ?employee ." + - "?employee <urn:worksAt> ?business ." + - "?employee <urn:hourlyWage> ?wage ." + - " }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements that generate a bunch of right SP results. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") ); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:BurgerJoint")); - bs.addBinding("wage", vf.createLiteral(7.25)); - expected.add( new VisibilityBindingSet(bs, "a") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } - - @Test - public void leftJoin() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Setup a topology. - final String query = - "SELECT * WHERE { " + - "?person <urn:talksTo> ?employee ." + - "OPTIONAL{ ?employee <urn:worksAt> ?business } " + - " }"; - final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Create some statements that generate a result that includes the optional value as well as one that does not. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") ); - - // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("employee", vf.createURI("urn:Bob")); - bs.addBinding("business", vf.createURI("urn:TacoPlace")); - expected.add( new VisibilityBindingSet(bs, "a&b") ); - - bs = new MapBindingSet(); - bs.addBinding("person", vf.createURI("urn:Bob")); - bs.addBinding("employee", vf.createURI("urn:Charlie")); - expected.add( new VisibilityBindingSet(bs, "c") ); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java deleted file mode 100644 index a8de401..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.projection; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.BNode; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.RDF; - -/** - * Integration tests the methods of {@link MultiProjectionProcessor}. - */ -public class MultiProjectionProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void showProcessorWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Create a topology for the Query that will be tested. - final String sparql = - "CONSTRUCT {" + - "_:b a <urn:movementObservation> ; " + - "<urn:location> ?location ; " + - "<urn:direction> ?direction ; " + - "}" + - "WHERE {" + - "?thing <urn:corner> ?location ." + - "?thing <urn:compass> ?direction." + - "}"; - - final String bNodeId = UUID.randomUUID().toString(); - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId); - - // Create the statements that will be input into the query. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") ); - statements.add( new VisibilityStatement( - vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") ); - - // Make the expected results. - final Set<VisibilityStatement> expected = new HashSet<>(); - final BNode blankNode = vf.createBNode(bNodeId); - - expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a")); - expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a")); - expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a")); - - // Run the test. - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java deleted file mode 100644 index 2af3a49..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.projection; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.RandomUUIDFactory; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RyaStreamsTestUtil; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; -import org.apache.rya.streams.kafka.topology.TopologyFactory; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.impl.MapBindingSet; - -import com.google.common.collect.Sets; - -/** - * Integration tests the methods of {@link StatementPatternProcessor}. - */ -public class ProjectionProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void showProcessorWorks() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Create a topology for the Query that will be tested. - final String sparql = - "SELECT (?person AS ?p) ?otherPerson " + - "WHERE { " + - "?person <urn:talksTo> ?otherPerson . " + - "}"; - - final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); - - // Load some data into the input topic. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - - // Show the correct binding set results from the job. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - - final MapBindingSet expectedBs = new MapBindingSet(); - expectedBs.addBinding("p", vf.createURI("urn:Alice")); - expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add(new VisibilityBindingSet(expectedBs, "a")); - - RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/pom.xml b/extras/rya.streams/pom.xml index bd559d2..dd876a0 100644 --- a/extras/rya.streams/pom.xml +++ b/extras/rya.streams/pom.xml @@ -35,7 +35,10 @@ <modules> <module>kafka</module> + <module>kafka-test</module> <module>api</module> <module>client</module> + <module>geo</module> + <module>integration</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 58cc7c5..31b17f8 100644 --- a/pom.xml +++ b/pom.xml @@ -329,6 +329,11 @@ under the License. <artifactId>rya.streams.kafka</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka-test</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.thrift</groupId> @@ -357,6 +362,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> + <artifactId>rya.test.rdf</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> <artifactId>rya.test.kafka</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index d458f8f..d13ea0b 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -35,5 +35,6 @@ <modules> <module>kafka</module> + <module>rdf</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/rdf/pom.xml ---------------------------------------------------------------------- diff --git a/test/rdf/pom.xml b/test/rdf/pom.xml new file mode 100644 index 0000000..f8dae1d --- /dev/null +++ b/test/rdf/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.test.rdf</artifactId> + + <name>Apache Rya Test RDF</name> + <description> + This module contains the Rya Test RDF components that help write RDF based tests. + </description> + + <dependencies> + <!-- Third Party Dependencies --> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryalgebra-model</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-sparql</artifactId> + </dependency> + + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java ---------------------------------------------------------------------- diff --git a/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java new file mode 100644 index 0000000..b4388c3 --- /dev/null +++ b/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.atomic.AtomicReference; + +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A set of utility functions that are useful when writing tests RDF functions. + */ +@DefaultAnnotation(NonNull.class) +public final class RdfTestUtil { + + private RdfTestUtil() { } + + /** + * Fetch the {@link StatementPattern} from a SPARQL string. + * + * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul) + * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null} + * @throws Exception The statement pattern could not be found in the parsed SPARQL query. + */ + public static @Nullable StatementPattern getSp(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final StatementPattern node) throws Exception { + statementPattern.set(node); + } + }); + return statementPattern.get(); + } + + /** + * Get the first {@link Projection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Projection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Projection getProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Projection> projection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Projection node) throws Exception { + projection.set(node); + } + }); + + return projection.get(); + } + + /** + * Get the first {@link MultiProjection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link MultiProjection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final MultiProjection node) throws Exception { + multiProjection.set(node); + } + }); + + return multiProjection.get(); + } + + /** + * Get the first {@link Filter} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Filter} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Filter getFilter(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Filter> filter = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Filter node) throws Exception { + filter.set(node); + } + }); + + return filter.get(); + } +} \ No newline at end of file