http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java new file mode 100644 index 0000000..45855a0 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.FOAF; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.ListBindingSet; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import com.mongodb.DBObject; +import com.mongodb.util.JSON; + +import info.aduna.iteration.CloseableIteration; + +public class PipelineQueryIT extends MongoITBase { + + private static ValueFactory VF = ValueFactoryImpl.getInstance(); + private static SPARQLParser PARSER = new SPARQLParser(); + + private MongoDBRyaDAO dao; + + @Before + @Override + public void setupTest() throws Exception { + super.setupTest(); + dao = new MongoDBRyaDAO(); + dao.setConf(conf); + dao.init(); + } + + private void insert(Resource subject, URI predicate, Value object) throws RyaDAOException { + insert(subject, predicate, object, 0); + } + + private void insert(Resource subject, URI predicate, Value object, int derivationLevel) throws RyaDAOException { + final RyaStatementBuilder builder = new RyaStatementBuilder(); + builder.setSubject(RdfToRyaConversions.convertResource(subject)); + builder.setPredicate(RdfToRyaConversions.convertURI(predicate)); + builder.setObject(RdfToRyaConversions.convertValue(object)); + final RyaStatement rstmt = builder.build(); + if (derivationLevel > 0) { + DBObject obj = new SimpleMongoDBStorageStrategy().serialize(builder.build()); + obj.put("derivation_level", derivationLevel); + getRyaDbCollection().insert(obj); + } + else { + dao.add(rstmt); + } + } + + private void testPipelineQuery(String query, Multiset<BindingSet> expectedSolutions) throws Exception { + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + // Execute pipeline and verify results + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Multiset<BindingSet> solutions = HashMultiset.create(); + CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + } + + @Test + public void testSingleStatementPattern() throws Exception { + // Insert data + insert(OWL.THING, RDF.TYPE, OWL.CLASS); + insert(FOAF.PERSON, RDF.TYPE, OWL.CLASS, 1); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, OWL.THING); + insert(VF.createURI("urn:Alice"), RDF.TYPE, FOAF.PERSON); + dao.flush(); + // Define query and expected results + final String query = "SELECT * WHERE {\n" + + " ?individual a ?type .\n" + + "}"; + List<String> varNames = Arrays.asList("individual", "type"); + Multiset<BindingSet> expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, OWL.CLASS)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, OWL.CLASS)); + expectedSolutions.add(new ListBindingSet(varNames, VF.createURI("urn:Alice"), FOAF.PERSON)); + // Execute pipeline and verify results + testPipelineQuery(query, expectedSolutions); + } + + @Test + public void testJoinTwoSharedVariables() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List<String> varNames = Arrays.asList("A", "B"); + Multiset<BindingSet> expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Execute query and verify results + testPipelineQuery(query, expectedSolutions); + } + + @Test + public void testVariableRename() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI carol = VF.createURI("urn:Carol"); + URI dan = VF.createURI("urn:Dan"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + insert(alice, friend, bob); + insert(alice, friend, carol); + insert(bob, friend, eve); + insert(carol, friend, eve); + insert(dan, friend, carol); + insert(eve, friend, alice); + // Define non-distinct query and expected results + final String query1 = "SELECT ?x (?z as ?friendOfFriend) WHERE {\n" + + " ?x <urn:friend> ?y .\n" + + " ?y <urn:friend> ?z .\n" + + "}"; + Multiset<BindingSet> expectedSolutions1 = HashMultiset.create(); + List<String> varNames = Arrays.asList("x", "friendOfFriend"); + expectedSolutions1.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, bob, alice)); + expectedSolutions1.add(new ListBindingSet(varNames, carol, alice)); + expectedSolutions1.add(new ListBindingSet(varNames, dan, eve)); + expectedSolutions1.add(new ListBindingSet(varNames, eve, bob)); + expectedSolutions1.add(new ListBindingSet(varNames, eve, carol)); + // Define distinct query and expected results + final String query2 = "SELECT DISTINCT ?x (?z as ?friendOfFriend) WHERE {\n" + + " ?x <urn:friend> ?y .\n" + + " ?y <urn:friend> ?z .\n" + + "}"; + Multiset<BindingSet> expectedSolutions2 = HashMultiset.create(); + expectedSolutions2.add(new ListBindingSet(varNames, alice, eve)); + expectedSolutions2.add(new ListBindingSet(varNames, bob, alice)); + expectedSolutions2.add(new ListBindingSet(varNames, carol, alice)); + expectedSolutions2.add(new ListBindingSet(varNames, dan, eve)); + expectedSolutions2.add(new ListBindingSet(varNames, eve, bob)); + expectedSolutions2.add(new ListBindingSet(varNames, eve, carol)); + // Execute and verify results + testPipelineQuery(query1, expectedSolutions1); + testPipelineQuery(query2, expectedSolutions2); + } + + @Test + public void testFilterQuery() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI relatedTo = VF.createURI("urn:relatedTo"); + insert(alice, FOAF.KNOWS, bob); + insert(alice, FOAF.KNOWS, alice); + insert(alice, FOAF.KNOWS, eve); + insert(alice, relatedTo, bob); + insert(bob, FOAF.KNOWS, eve); + insert(bob, relatedTo, bob); + dao.flush(); + // Define query 1 and expected results + final String query1 = "SELECT * WHERE {\n" + + " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n" + + " ?x <" + relatedTo.stringValue() + "> ?y2 .\n" + + " FILTER (?y1 != ?y2) .\n" + + "}"; + final List<String> varNames = Arrays.asList("x", "y1", "y2"); + final Multiset<BindingSet> expected1 = HashMultiset.create(); + expected1.add(new ListBindingSet(varNames, alice, alice, bob)); + expected1.add(new ListBindingSet(varNames, alice, eve, bob)); + expected1.add(new ListBindingSet(varNames, bob, eve, bob)); + // Define query 2 and expected results + final String query2 = "SELECT * WHERE {\n" + + " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n" + + " ?x <" + relatedTo.stringValue() + "> ?y2 .\n" + + " FILTER (?y1 = ?y2) .\n" + + "}"; + final Multiset<BindingSet> expected2 = HashMultiset.create(); + expected2.add(new ListBindingSet(varNames, alice, bob, bob)); + // Execute and verify results + testPipelineQuery(query1, expected1); + testPipelineQuery(query2, expected2); + } + + @Test + public void testMultiConstruct() throws Exception { + // Insert data + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + URI knows = VF.createURI("urn:knows"); + URI person = VF.createURI("urn:Person"); + insert(alice, friend, bob); + insert(bob, knows, eve); + insert(eve, knows, alice); + // Define query and expected results + final String query = "CONSTRUCT {\n" + + " ?x rdf:type owl:Thing .\n" + + " ?x rdf:type <urn:Person> .\n" + + "} WHERE { ?x <urn:knows> ?y }"; + final Multiset<BindingSet> expected = HashMultiset.create(); + List<String> varNames = Arrays.asList("subject", "predicate", "object"); + expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, OWL.THING)); + expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, person)); + expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, OWL.THING)); + expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, person)); + // Test query + testPipelineQuery(query, expected); + } + + @Test + public void testTriplePipeline() throws Exception { + URI alice = VF.createURI("urn:Alice"); + URI bob = VF.createURI("urn:Bob"); + URI eve = VF.createURI("urn:Eve"); + URI friend = VF.createURI("urn:friend"); + URI knows = VF.createURI("urn:knows"); + URI year = VF.createURI("urn:year"); + Literal yearLiteral = VF.createLiteral("2017", XMLSchema.GYEAR); + final String query = "CONSTRUCT {\n" + + " ?x <urn:knows> ?y .\n" + + " ?x <urn:year> \"2017\"^^<" + XMLSchema.GYEAR + "> .\n" + + "} WHERE { ?x <urn:friend> ?y }"; + insert(alice, friend, bob); + insert(bob, knows, eve); + insert(eve, knows, alice); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + // Get pipeline, add triple conversion, and verify that the result is a + // properly serialized statement + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + List<Bson> triplePipeline = pipelineNode.getTriplePipeline(System.currentTimeMillis(), false); + SimpleMongoDBStorageStrategy strategy = new SimpleMongoDBStorageStrategy(); + List<Statement> results = new LinkedList<>(); + for (Document doc : getRyaCollection().aggregate(triplePipeline)) { + final DBObject dbo = (DBObject) JSON.parse(doc.toJson()); + RyaStatement rstmt = strategy.deserializeDBObject(dbo); + Statement stmt = RyaToRdfConversions.convertStatement(rstmt); + results.add(stmt); + } + Assert.assertEquals(2, results.size()); + Assert.assertTrue(results.contains(VF.createStatement(alice, knows, bob))); + Assert.assertTrue(results.contains(VF.createStatement(alice, year, yearLiteral))); + } + + @Test + public void testRequiredDerivationLevel() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING, 1); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List<String> varNames = Arrays.asList("A", "B"); + Multiset<BindingSet> expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + // Extend the pipeline by requiring a derivation level of zero (should have no effect) + pipelineNode.requireSourceDerivationDepth(0); + Multiset<BindingSet> solutions = HashMultiset.create(); + CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + // Extend the pipeline by requiring a derivation level of one (should produce the thing/thing pair) + expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + pipelineNode.requireSourceDerivationDepth(1); + solutions = HashMultiset.create(); + iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + } + + @Test + public void testRequiredTimestamp() throws Exception { + // Insert data + URI person = VF.createURI("urn:Person"); + URI livingThing = VF.createURI("urn:LivingThing"); + URI human = VF.createURI("urn:Human"); + URI programmer = VF.createURI("urn:Programmer"); + URI thing = VF.createURI("urn:Thing"); + insert(programmer, RDFS.SUBCLASSOF, person); + insert(person, RDFS.SUBCLASSOF, FOAF.PERSON, 2); + insert(FOAF.PERSON, RDFS.SUBCLASSOF, person); + insert(person, OWL.EQUIVALENTCLASS, human); + insert(person, RDFS.SUBCLASSOF, livingThing); + insert(livingThing, RDFS.SUBCLASSOF, thing); + insert(thing, RDFS.SUBCLASSOF, OWL.THING); + insert(OWL.THING, RDFS.SUBCLASSOF, thing); + dao.flush(); + // Define query and expected results + final String query = "SELECT ?A ?B WHERE {\n" + + " ?A rdfs:subClassOf ?B .\n" + + " ?B rdfs:subClassOf ?A .\n" + + "}"; + List<String> varNames = Arrays.asList("A", "B"); + Multiset<BindingSet> expectedSolutions = HashMultiset.create(); + expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); + expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); + expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); + expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); + // Prepare query and convert to pipeline + QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + // Extend the pipeline by requiring a timestamp of zero (should have no effect) + pipelineNode.requireSourceTimestamp(0); + Multiset<BindingSet> solutions = HashMultiset.create(); + CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet()); + while (iter.hasNext()) { + solutions.add(iter.next()); + } + Assert.assertEquals(expectedSolutions, solutions); + // Extend the pipeline by requiring a future timestamp (should produce no results) + long delta = 1000 * 60 * 60 * 24; + pipelineNode.requireSourceTimestamp(System.currentTimeMillis() + delta); + iter = pipelineNode.evaluate(new QueryBindingSet()); + Assert.assertFalse(iter.hasNext()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java new file mode 100644 index 0000000..6775235 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineResultIterationTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.ListBindingSet; + +import com.google.common.collect.Sets; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCursor; + +public class PipelineResultIterationTest { + ValueFactory VF = ValueFactoryImpl.getInstance(); + + @SuppressWarnings("unchecked") + private AggregateIterable<Document> documentIterator(Document ... documents) { + Iterator<Document> docIter = Arrays.asList(documents).iterator(); + MongoCursor<Document> cursor = Mockito.mock(MongoCursor.class); + Mockito.when(cursor.hasNext()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return docIter.hasNext(); + } + }); + Mockito.when(cursor.next()).thenAnswer(new Answer<Document>() { + @Override + public Document answer(InvocationOnMock invocation) throws Throwable { + return docIter.next(); + } + }); + AggregateIterable<Document> aggIter = Mockito.mock(AggregateIterable.class); + Mockito.when(aggIter.iterator()).thenReturn(cursor); + return aggIter; + } + + @Test + public void testIteration() throws QueryEvaluationException { + HashMap<String, String> nameMap = new HashMap<>(); + nameMap.put("bName", "b"); + nameMap.put("eName", "e"); + PipelineResultIteration iter = new PipelineResultIteration( + documentIterator( + new Document("<VALUES>", new Document("a", "urn:Alice").append("b", "urn:Bob")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("b", "urn:Beth")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("bName", "urn:Bob")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("c", "urn:Carol")), + new Document("<VALUES>", new Document("cName", "urn:Carol").append("d", "urn:Dan"))), + nameMap, + new QueryBindingSet()); + Assert.assertTrue(iter.hasNext()); + BindingSet bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Beth", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "b"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("a", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Carol", bs.getBinding("c").getValue().stringValue()); + bs = iter.next(); + Assert.assertEquals(Sets.newHashSet("cName", "d"), bs.getBindingNames()); + Assert.assertEquals("urn:Carol", bs.getBinding("cName").getValue().stringValue()); + Assert.assertEquals("urn:Dan", bs.getBinding("d").getValue().stringValue()); + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void testIterationGivenBindingSet() throws QueryEvaluationException { + BindingSet solution = new ListBindingSet(Arrays.asList("b", "c"), + VF.createURI("urn:Bob"), VF.createURI("urn:Charlie")); + HashMap<String, String> nameMap = new HashMap<>(); + nameMap.put("bName", "b"); + nameMap.put("cName", "c"); + nameMap.put("c", "cName"); + PipelineResultIteration iter = new PipelineResultIteration( + documentIterator( + new Document("<VALUES>", new Document("a", "urn:Alice").append("b", "urn:Bob")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("b", "urn:Beth")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("bName", "urn:Bob")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("bName", "urn:Beth")), + new Document("<VALUES>", new Document("a", "urn:Alice").append("cName", "urn:Carol")), + new Document("<VALUES>", new Document("c", "urn:Carol").append("d", "urn:Dan"))), + nameMap, + solution); + Assert.assertTrue(iter.hasNext()); + BindingSet bs = iter.next(); + // Add 'c=Charlie' to first result ('b=Bob' matches) + Assert.assertEquals(Sets.newHashSet("a", "b", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + // Skip second result ('b=Beth' incompatible with 'b=Bob') + // Add 'c=Charlie' to third result ('bName=Bob' resolves to 'b=Bob', matches) + Assert.assertEquals(Sets.newHashSet("a", "b", "c"), bs.getBindingNames()); + Assert.assertEquals("urn:Alice", bs.getBinding("a").getValue().stringValue()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertTrue(iter.hasNext()); + bs = iter.next(); + // Skip fourth result ('bName=Beth' resolves to 'b=Beth', incompatible) + // Skip fifth result ('cName=Carol' resolves to 'c=Carol', incompatible with 'c=Charlie') + // Add 'b=Bob' and 'c=Charlie' to sixth result ('c=Carol' resolves to 'cName=Carol', compatible) + Assert.assertEquals(Sets.newHashSet("b", "c", "cName", "d"), bs.getBindingNames()); + Assert.assertEquals("urn:Bob", bs.getBinding("b").getValue().stringValue()); + Assert.assertEquals("urn:Charlie", bs.getBinding("c").getValue().stringValue()); + Assert.assertEquals("urn:Carol", bs.getBinding("cName").getValue().stringValue()); + Assert.assertEquals("urn:Dan", bs.getBinding("d").getValue().stringValue()); + Assert.assertFalse(iter.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java new file mode 100644 index 0000000..cc9349b --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.List; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Not; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; + +import com.google.common.collect.Sets; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; + +public class SparqlToPipelineTransformVisitorTest { + + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + private static final String LUBM = "urn:lubm"; + private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent"); + private static final URI PROFESSOR = VF.createURI(LUBM, "Professor"); + private static final URI COURSE = VF.createURI(LUBM, "Course"); + private static final URI TAKES = VF.createURI(LUBM, "takesCourse"); + private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse"); + + private static Var constant(URI value) { + return new Var(value.stringValue(), value); + } + + MongoCollection<Document> collection; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + collection = Mockito.mock(MongoCollection.class); + Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); + } + + @Test + public void testStatementPattern() throws Exception { + QueryRoot query = new QueryRoot(new StatementPattern( + new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + query.visit(visitor); + Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); + Assert.assertEquals(Sets.newHashSet("x"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testJoin() throws Exception { + QueryRoot query = new QueryRoot(new Join( + new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)), + new StatementPattern(new Var("x"), constant(TAKES), new Var("course")))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + query.visit(visitor); + Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "course"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testNestedJoins() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isProfessor = new StatementPattern(new Var("y"), constant(RDF.TYPE), constant(PROFESSOR)); + StatementPattern takesCourse = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + StatementPattern teachesCourse = new StatementPattern(new Var("y"), constant(TEACHES), new Var("c")); + QueryRoot queryTree = new QueryRoot(new Join( + isProfessor, + new Join( + new Join(isUndergrad, takesCourse), + teachesCourse))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "y", "c"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testComplexJoin() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isProfessor = new StatementPattern(new Var("y"), constant(RDF.TYPE), constant(PROFESSOR)); + StatementPattern takesCourse = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + StatementPattern teachesCourse = new StatementPattern(new Var("y"), constant(TEACHES), new Var("c")); + QueryRoot queryTree = new QueryRoot(new Join( + new Join(isUndergrad, takesCourse), + new Join(isProfessor, teachesCourse))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof Join); + Join topJoin = (Join) queryTree.getArg(); + Assert.assertTrue(topJoin.getLeftArg() instanceof AggregationPipelineQueryNode); + Assert.assertTrue(topJoin.getRightArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode leftPipeline = (AggregationPipelineQueryNode) topJoin.getLeftArg(); + AggregationPipelineQueryNode rightPipeline = (AggregationPipelineQueryNode) topJoin.getRightArg(); + Assert.assertEquals(Sets.newHashSet("x", "c"), leftPipeline.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("y", "c"), rightPipeline.getAssuredBindingNames()); + } + + @Test + public void testProjection() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isCourse = new StatementPattern(new Var("course"), constant(RDF.TYPE), constant(COURSE)); + StatementPattern hasEdge = new StatementPattern(new Var("x"), new Var("p"), new Var("course")); + ProjectionElemList projectionElements = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("course")); + QueryRoot queryTree = new QueryRoot(new Projection( + new Join(new Join(isCourse, hasEdge), isUndergrad), + projectionElements)); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("relation", "course"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testMultiProjection() throws Exception { + StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)); + StatementPattern isCourse = new StatementPattern(new Var("course"), constant(RDF.TYPE), constant(COURSE)); + StatementPattern hasEdge = new StatementPattern(new Var("x"), new Var("p"), new Var("course")); + ProjectionElemList courseHasRelation = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("course")); + ProjectionElemList studentHasRelation = new ProjectionElemList( + new ProjectionElem("p", "relation"), + new ProjectionElem("x", "student")); + QueryRoot queryTree = new QueryRoot(new MultiProjection( + new Join(new Join(isCourse, hasEdge), isUndergrad), + Arrays.asList(courseHasRelation, studentHasRelation))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("relation"), pipelineNode.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("relation", "course", "student"), pipelineNode.getBindingNames()); + } + + @Test + public void testExtension() throws Exception { + QueryRoot queryTree = new QueryRoot(new Extension( + new StatementPattern(new Var("x"), constant(TAKES), new Var("c")), + new ExtensionElem(new Var("x"), "renamed"), + new ExtensionElem(new ValueConstant(TAKES), "constant"))); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); + Assert.assertEquals(Sets.newHashSet("x", "c", "renamed", "constant"), pipelineNode.getAssuredBindingNames()); + } + + @Test + public void testUnsupportedExtension() throws Exception { + StatementPattern sp = new StatementPattern(new Var("x"), constant(TAKES), new Var("c")); + List<ExtensionElem> elements = Arrays.asList(new ExtensionElem(new Var("x"), "renamed"), + new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue"), + new ExtensionElem(new ValueConstant(TAKES), "constant")); + Extension extensionNode = new Extension(sp, elements); + QueryRoot queryTree = new QueryRoot(extensionNode); + SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); + queryTree.visit(visitor); + Assert.assertTrue(queryTree.getArg() instanceof Extension); + Assert.assertEquals(elements, ((Extension) queryTree.getArg()).getElements()); + TupleExpr innerQuery = ((Extension) queryTree.getArg()).getArg(); + Assert.assertTrue(innerQuery instanceof AggregationPipelineQueryNode); + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) innerQuery; + Assert.assertEquals(Sets.newHashSet("x", "c"), pipelineNode.getAssuredBindingNames()); + } +}
