Repository: incubator-rya Updated Branches: refs/heads/develop 1d92d1991 -> 30ca57ede
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java new file mode 100644 index 0000000..15023c5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Tests the methods of {@link NaturalJoin}. + */ +public class NaturalJoinTest { + + private final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void newLeftResult_noRightMatches() { + IterativeJoin naturalJoin = new NaturalJoin(); + + // There is a new left result. + MapBindingSet newLeftResult = new MapBindingSet(); + newLeftResult.addBinding("name", vf.createLiteral("Bob")); + + // There are no right results that join with the left result. + Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator(); + + // Therefore, the left result is a new join result. + Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults); + assertFalse( newJoinResultsIt.hasNext() ); + } + + @Test + public void newLeftResult_joinsWithRightResults() { + IterativeJoin naturalJoin = new NaturalJoin(); + + // There is a new left result. + MapBindingSet newLeftResult = new MapBindingSet(); + newLeftResult.addBinding("name", vf.createLiteral("Bob")); + newLeftResult.addBinding("height", vf.createLiteral("5'9\"")); + + // There are a few right results that join with the left result. + MapBindingSet nameAge = new MapBindingSet(); + nameAge.addBinding("name", vf.createLiteral("Bob")); + nameAge.addBinding("age", vf.createLiteral(56)); + + MapBindingSet nameHair = new MapBindingSet(); + nameHair.addBinding("name", vf.createLiteral("Bob")); + nameHair.addBinding("hairColor", vf.createLiteral("Brown")); + + Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + + // Therefore, there are a few new join results that mix the two together. + Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults); + + Set<BindingSet> newJoinResults = new HashSet<>(); + while(newJoinResultsIt.hasNext()) { + newJoinResults.add( newJoinResultsIt.next() ); + } + + Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + MapBindingSet nameHeightAge = new MapBindingSet(); + nameHeightAge.addBinding("name", vf.createLiteral("Bob")); + nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightAge.addBinding("age", vf.createLiteral(56)); + expected.add(nameHeightAge); + + MapBindingSet nameHeightHair = new MapBindingSet(); + nameHeightHair.addBinding("name", vf.createLiteral("Bob")); + nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); + expected.add(nameHeightHair); + + assertEquals(expected, newJoinResults); + } + + @Test + public void newRightResult_noLeftMatches() { + IterativeJoin naturalJoin = new NaturalJoin(); + + // There are no left results. + Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator(); + + // There is a new right result. + MapBindingSet newRightResult = new MapBindingSet(); + newRightResult.addBinding("name", vf.createLiteral("Bob")); + + // Therefore, there are no new join results. + Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult); + assertFalse( newJoinResultsIt.hasNext() ); + } + + @Test + public void newRightResult_joinsWithLeftResults() { + IterativeJoin naturalJoin = new NaturalJoin(); + + // There are a few left results that join with the new right result. + MapBindingSet nameAge = new MapBindingSet(); + nameAge.addBinding("name", vf.createLiteral("Bob")); + nameAge.addBinding("age", vf.createLiteral(56)); + + MapBindingSet nameHair = new MapBindingSet(); + nameHair.addBinding("name", vf.createLiteral("Bob")); + nameHair.addBinding("hairColor", vf.createLiteral("Brown")); + + Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + + // There is a new right result. + MapBindingSet newRightResult = new MapBindingSet(); + newRightResult.addBinding("name", vf.createLiteral("Bob")); + newRightResult.addBinding("height", vf.createLiteral("5'9\"")); + + // Therefore, there are a few new join results that mix the two together. + Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult); + + Set<BindingSet> newJoinResults = new HashSet<>(); + while(newJoinResultsIt.hasNext()) { + newJoinResults.add( newJoinResultsIt.next() ); + } + + Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + MapBindingSet nameHeightAge = new MapBindingSet(); + nameHeightAge.addBinding("name", vf.createLiteral("Bob")); + nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightAge.addBinding("age", vf.createLiteral(56)); + expected.add(nameHeightAge); + + MapBindingSet nameHeightHair = new MapBindingSet(); + nameHeightHair.addBinding("name", vf.createLiteral("Bob")); + nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); + expected.add(nameHeightHair); + + assertEquals(expected, newJoinResults); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index a772b83..0cbfa9a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -62,6 +62,7 @@ import mvm.rya.api.domain.RyaURI; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.api.resolver.RyaTypeResolverException; import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; +import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.PcjTables; import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; @@ -75,6 +76,8 @@ import mvm.rya.rdftriplestore.RyaSailRepository; public class FluoAndHistoricPcjsDemo implements Demo { private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class); + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + // Employees private static final RyaURI alice = new RyaURI("http://Alice"); private static final RyaURI bob = new RyaURI("http://Bob"); @@ -350,11 +353,11 @@ public class FluoAndHistoricPcjsDemo implements Demo { for(final Entry<Key, Value> entry : scanner) { final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray()); + final BindingSet result = converter.convert(serializedResult, varOrder); fetchedResults.put(varOrder.toString(), result); } } - } catch(PcjException | TableNotFoundException | RyaTypeResolverException e) { + } catch(PcjException | TableNotFoundException | BindingSetConversionException e) { throw new DemoExecutionException("Couldn't fetch the binding sets that were exported to the PCJ table, so the demo can not continue. Exiting.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index c9af9aa..cfb5248 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -38,7 +38,6 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; @@ -78,6 +77,8 @@ import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; import mvm.rya.rdftriplestore.RdfCloudTripleStore; import mvm.rya.rdftriplestore.RyaSailRepository; @@ -260,17 +261,19 @@ public abstract class ITBase { // Fetch the query's variable order. final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId); - final String[] varOrder = queryMetadata.getVariableOrder().toArray(); + final VariableOrder varOrder = queryMetadata.getVariableOrder(); // Fetch the Binding Sets for the query. final ScannerConfiguration scanConfig = new ScannerConfiguration(); scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier()); + BindingSetStringConverter converter = new BindingSetStringConverter(); + final RowIterator rowIter = snapshot.get(scanConfig); while(rowIter.hasNext()) { final Entry<Bytes, ColumnIterator> row = rowIter.next(); final String bindingSetString = row.getValue().next().getValue().toString(); - final BindingSet bindingSet = FluoStringConverter.toBindingSet(bindingSetString, varOrder); + final BindingSet bindingSet = converter.convert(bindingSetString, varOrder); bindingSets.add(bindingSet); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index 231c8f1..601dfd4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -21,6 +21,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.junit.Test; import org.openrdf.query.MalformedQueryException; @@ -99,6 +100,7 @@ public class FluoQueryMetadataDAOIT extends ITBase { // Create the object that will be serialized. final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId"); builder.setVariableOrder(new VariableOrder("g;y;s")); + builder.setJoinType(JoinType.NATURAL_JOIN); builder.setParentNodeId("parentNodeId"); builder.setLeftChildNodeId("leftChildNodeId"); builder.setRightChildNodeId("rightChildNodeId"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 32b7d84..46db8cd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -45,6 +45,50 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ public class QueryIT extends ITBase { + @Test + public void optionalStatements() throws Exception { + // A query that has optional statement patterns. This query is looking for all + // people who have Law degrees and any BAR exams they have passed (though they + // do not have to have passed any). + final String sparql = + "SELECT ?person ?exam " + + "WHERE {" + + "?person <http://hasDegreeIn> <http://Law> . " + + "OPTIONAL {?person <http://passedExam> ?exam } . " + + "}"; + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet( + makeRyaStatement("http://Alice", "http://hasDegreeIn", "http://Computer Science"), + makeRyaStatement("http://Alice", "http://passedExam", "http://Certified Ethical Hacker"), + makeRyaStatement("http://Bob", "http://hasDegreeIn", "http://Law"), + makeRyaStatement("http://Bob", "http://passedExam", "http://MBE"), + makeRyaStatement("http://Bob", "http://passedExam", "http://BAR-Kansas"), + makeRyaStatement("http://Charlie", "http://hasDegreeIn", "http://Law")); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add( makeBindingSet( + new BindingImpl("person", new URIImpl("http://Bob")), + new BindingImpl("exam", new URIImpl("http://MBE")))); + expected.add( makeBindingSet( + new BindingImpl("person", new URIImpl("http://Bob")), + new BindingImpl("exam", new URIImpl("http://BAR-Kansas")))); + expected.add( makeBindingSet( + new BindingImpl("person", new URIImpl("http://Charlie")))); + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + /** * Tests when there are a bunch of variables across a bunch of joins. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index ae728cd..ee3fffd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -52,6 +52,7 @@ import io.fluo.api.data.Bytes; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RyaTypeResolverException; import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; +import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.PcjTables; import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; @@ -64,6 +65,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ public class RyaExportIT extends ITBase { + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + /** * Configure the export observer to use the Mini Accumulo instance as the * export destination for new PCJ results. @@ -163,7 +166,7 @@ public class RyaExportIT extends ITBase { * multimap stores a set of deserialized binding sets that were in the PCJ * table for every variable order that is found in the PCJ metadata. */ - private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, RyaTypeResolverException { + private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException { final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); // Get the variable orders the data was written to. @@ -177,7 +180,7 @@ public class RyaExportIT extends ITBase { for(final Entry<Key, Value> entry : scanner) { final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray()); + final BindingSet result = converter.convert(serializedResult, varOrder); fetchedResults.put(varOrder.toString(), result); } }
