Repository: incubator-rya
Updated Branches:
  refs/heads/develop 1d92d1991 -> 30ca57ede


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
new file mode 100644
index 0000000..15023c5
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Tests the methods of {@link NaturalJoin}.
+ */
+public class NaturalJoinTest {
+
+    private final ValueFactory vf = new ValueFactoryImpl();
+
+    @Test
+    public void newLeftResult_noRightMatches() {
+        IterativeJoin naturalJoin = new NaturalJoin();
+
+        // There is a new left result.
+        MapBindingSet newLeftResult = new MapBindingSet();
+        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+
+        // There are no right results that join with the left result.
+        Iterator<BindingSet> rightResults= new 
ArrayList<BindingSet>().iterator();
+
+        // Therefore, the left result is a new join result.
+        Iterator<BindingSet> newJoinResultsIt = 
naturalJoin.newLeftResult(newLeftResult, rightResults);
+        assertFalse( newJoinResultsIt.hasNext() );
+    }
+
+    @Test
+    public void newLeftResult_joinsWithRightResults() {
+        IterativeJoin naturalJoin = new NaturalJoin();
+
+        // There is a new left result.
+        MapBindingSet newLeftResult = new MapBindingSet();
+        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+        newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+
+        // There are a few right results that join with the left result.
+        MapBindingSet nameAge = new MapBindingSet();
+        nameAge.addBinding("name", vf.createLiteral("Bob"));
+        nameAge.addBinding("age", vf.createLiteral(56));
+
+        MapBindingSet nameHair = new MapBindingSet();
+        nameHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+        Iterator<BindingSet> rightResults = 
Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+        // Therefore, there are a few new join results that mix the two 
together.
+        Iterator<BindingSet> newJoinResultsIt = 
naturalJoin.newLeftResult(newLeftResult, rightResults);
+
+        Set<BindingSet> newJoinResults = new HashSet<>();
+        while(newJoinResultsIt.hasNext()) {
+            newJoinResults.add( newJoinResultsIt.next() );
+        }
+
+        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        MapBindingSet nameHeightAge = new MapBindingSet();
+        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightAge.addBinding("age", vf.createLiteral(56));
+        expected.add(nameHeightAge);
+
+        MapBindingSet nameHeightHair = new MapBindingSet();
+        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+        expected.add(nameHeightHair);
+
+        assertEquals(expected, newJoinResults);
+    }
+
+    @Test
+    public void newRightResult_noLeftMatches() {
+        IterativeJoin naturalJoin = new NaturalJoin();
+
+        // There are no left results.
+        Iterator<BindingSet> leftResults= new 
ArrayList<BindingSet>().iterator();
+
+        // There is a new right result.
+        MapBindingSet newRightResult = new MapBindingSet();
+        newRightResult.addBinding("name", vf.createLiteral("Bob"));
+
+        // Therefore, there are no new join results.
+        Iterator<BindingSet> newJoinResultsIt = 
naturalJoin.newRightResult(leftResults, newRightResult);
+        assertFalse( newJoinResultsIt.hasNext() );
+    }
+
+    @Test
+    public void newRightResult_joinsWithLeftResults() {
+        IterativeJoin naturalJoin = new NaturalJoin();
+
+        // There are a few left results that join with the new right result.
+        MapBindingSet nameAge = new MapBindingSet();
+        nameAge.addBinding("name", vf.createLiteral("Bob"));
+        nameAge.addBinding("age", vf.createLiteral(56));
+
+        MapBindingSet nameHair = new MapBindingSet();
+        nameHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+        Iterator<BindingSet> leftResults = 
Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+        // There is a new right result.
+        MapBindingSet newRightResult = new MapBindingSet();
+        newRightResult.addBinding("name", vf.createLiteral("Bob"));
+        newRightResult.addBinding("height", vf.createLiteral("5'9\""));
+
+        // Therefore, there are a few new join results that mix the two 
together.
+        Iterator<BindingSet> newJoinResultsIt = 
naturalJoin.newRightResult(leftResults, newRightResult);
+
+        Set<BindingSet> newJoinResults = new HashSet<>();
+        while(newJoinResultsIt.hasNext()) {
+            newJoinResults.add( newJoinResultsIt.next() );
+        }
+
+        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        MapBindingSet nameHeightAge = new MapBindingSet();
+        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightAge.addBinding("age", vf.createLiteral(56));
+        expected.add(nameHeightAge);
+
+        MapBindingSet nameHeightHair = new MapBindingSet();
+        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+        expected.add(nameHeightHair);
+
+        assertEquals(expected, newJoinResults);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index a772b83..0cbfa9a 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -62,6 +62,7 @@ import mvm.rya.api.domain.RyaURI;
 import mvm.rya.api.resolver.RyaToRdfConversions;
 import mvm.rya.api.resolver.RyaTypeResolverException;
 import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import 
mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.PcjTables;
 import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
 import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
@@ -75,6 +76,8 @@ import mvm.rya.rdftriplestore.RyaSailRepository;
 public class FluoAndHistoricPcjsDemo implements Demo {
     private static final Logger log = 
Logger.getLogger(FluoAndHistoricPcjsDemo.class);
 
+    private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+    
     // Employees
     private static final RyaURI alice = new RyaURI("http://Alice";);
     private static final RyaURI bob = new RyaURI("http://Bob";);
@@ -350,11 +353,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
 
                 for(final Entry<Key, Value> entry : scanner) {
                     final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
-                    final BindingSet result = 
AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+                    final BindingSet result = 
converter.convert(serializedResult, varOrder);
                     fetchedResults.put(varOrder.toString(), result);
                 }
             }
-        } catch(PcjException | TableNotFoundException | 
RyaTypeResolverException e) {
+        } catch(PcjException | TableNotFoundException | 
BindingSetConversionException e) {
             throw new DemoExecutionException("Couldn't fetch the binding sets 
that were exported to the PCJ table, so the demo can not continue. Exiting.", 
e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index c9af9aa..cfb5248 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -78,6 +77,8 @@ import mvm.rya.api.domain.RyaType;
 import mvm.rya.api.domain.RyaURI;
 import mvm.rya.api.resolver.RyaToRdfConversions;
 import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 import mvm.rya.rdftriplestore.RdfCloudTripleStore;
 import mvm.rya.rdftriplestore.RyaSailRepository;
 
@@ -260,17 +261,19 @@ public abstract class ITBase {
 
             // Fetch the query's variable order.
             final QueryMetadata queryMetadata = new 
FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
-            final String[] varOrder = 
queryMetadata.getVariableOrder().toArray();
+            final VariableOrder varOrder = queryMetadata.getVariableOrder();
 
             // Fetch the Binding Sets for the query.
             final ScannerConfiguration scanConfig = new ScannerConfiguration();
             
scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), 
FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
 
+            BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
             final RowIterator rowIter = snapshot.get(scanConfig);
             while(rowIter.hasNext()) {
                 final Entry<Bytes, ColumnIterator> row = rowIter.next();
                 final String bindingSetString = 
row.getValue().next().getValue().toString();
-                final BindingSet bindingSet = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+                final BindingSet bindingSet = 
converter.convert(bindingSetString, varOrder);
                 bindingSets.add(bindingSet);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 231c8f1..601dfd4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -21,6 +21,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
@@ -99,6 +100,7 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         // Create the object that will be serialized.
         final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId");
         builder.setVariableOrder(new VariableOrder("g;y;s"));
+        builder.setJoinType(JoinType.NATURAL_JOIN);
         builder.setParentNodeId("parentNodeId");
         builder.setLeftChildNodeId("leftChildNodeId");
         builder.setRightChildNodeId("rightChildNodeId");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 32b7d84..46db8cd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -45,6 +45,50 @@ import 
mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 public class QueryIT extends ITBase {
 
+    @Test
+    public void optionalStatements() throws Exception {
+        // A query that has optional statement patterns. This query is looking 
for all
+        // people who have Law degrees and any BAR exams they have passed 
(though they
+        // do not have to have passed any).
+        final String sparql =
+                "SELECT ?person ?exam " +
+                "WHERE {" +
+                    "?person <http://hasDegreeIn> <http://Law> . " +
+                    "OPTIONAL {?person <http://passedExam> ?exam } . " +
+                "}";
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+                makeRyaStatement("http://Alice";, "http://hasDegreeIn";, 
"http://Computer Science"),
+                makeRyaStatement("http://Alice";, "http://passedExam";, 
"http://Certified Ethical Hacker"),
+                makeRyaStatement("http://Bob";, "http://hasDegreeIn";, 
"http://Law";),
+                makeRyaStatement("http://Bob";, "http://passedExam";, 
"http://MBE";),
+                makeRyaStatement("http://Bob";, "http://passedExam";, 
"http://BAR-Kansas";),
+                makeRyaStatement("http://Charlie";, "http://hasDegreeIn";, 
"http://Law";));
+
+        // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add( makeBindingSet(
+                new BindingImpl("person", new URIImpl("http://Bob";)),
+                new BindingImpl("exam", new URIImpl("http://MBE";))));
+        expected.add( makeBindingSet(
+                new BindingImpl("person", new URIImpl("http://Bob";)),
+                new BindingImpl("exam", new URIImpl("http://BAR-Kansas";))));
+        expected.add( makeBindingSet(
+                new BindingImpl("person", new URIImpl("http://Charlie";))));
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Stream the data into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
+        assertEquals(expected,  results);
+    }
+
     /**
      * Tests when there are a bunch of variables across a bunch of joins.
      */

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index ae728cd..ee3fffd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -52,6 +52,7 @@ import io.fluo.api.data.Bytes;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.resolver.RyaTypeResolverException;
 import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import 
mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.PcjTables;
 import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
 import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
@@ -64,6 +65,8 @@ import 
mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 public class RyaExportIT extends ITBase {
 
+       private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+       
     /**
      * Configure the export observer to use the Mini Accumulo instance as the
      * export destination for new PCJ results.
@@ -163,7 +166,7 @@ public class RyaExportIT extends ITBase {
      * multimap stores a set of deserialized binding sets that were in the PCJ
      * table for every variable order that is found in the PCJ metadata.
      */
-    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName) throws PcjException, 
TableNotFoundException, RyaTypeResolverException {
+    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName) throws PcjException, 
TableNotFoundException, BindingSetConversionException {
         final Multimap<String, BindingSet> fetchedResults = 
HashMultimap.create();
 
         // Get the variable orders the data was written to.
@@ -177,7 +180,7 @@ public class RyaExportIT extends ITBase {
 
             for(final Entry<Key, Value> entry : scanner) {
                 final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
-                final BindingSet result = 
AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+                final BindingSet result = converter.convert(serializedResult, 
varOrder);
                 fetchedResults.put(varOrder.toString(), result);
             }
         }

Reply via email to