http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
new file mode 100644
index 0000000..0a1852b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+
+import com.google.common.collect.Sets;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Span;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+
+public class CreateDeleteIT extends ITBase {
+
+    /**
+     * Ensure historic matches are included in the result.
+     */
+    @Test
+    public void historicResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> 
<http://Eve>. "
+                + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Bob";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+
+                makeStatement("http://Eve";, "http://helps";, "http://Kevin";),
+
+                makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+
+        // The expected results of the SPARQL query once the PCJ has been
+        // computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Bob";))));
+        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Charlie";))));
+
+        // Load the historic data into Rya.
+        for (final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+
+        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
+        assertEquals(expected, results);
+
+        List<Bytes> rows = getFluoTableEntries(fluoClient);
+        assertEquals(17, rows.size());
+
+        // Delete the PCJ from the Fluo application.
+        new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+        // Ensure all data related to the query has been removed.
+        List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+        assertEquals(0, empty_rows.size());
+    }
+
+    private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
+        try (Snapshot snapshot = fluoClient.newSnapshot()) {
+            List<Bytes> rows = new ArrayList<>();
+
+            ScannerConfiguration sc1 = new ScannerConfiguration();
+            sc1.setSpan(Span.prefix(""));
+            RowIterator iterator = snapshot.get(sc1);
+
+            while (iterator.hasNext()) {
+                Entry<Bytes, ColumnIterator> row = iterator.next();
+                rows.add(row.getKey());
+            }
+            return rows;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index 2381346..ebd2759 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -27,7 +27,8 @@ import java.util.Set;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.impl.URIImpl;
@@ -84,8 +85,12 @@ public class InputIT extends ITBase {
             ryaConn.add(triple);
         }
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -127,8 +132,12 @@ public class InputIT extends ITBase {
         expected.add(makeBindingSet(
                 new BindingImpl("x", new URIImpl("http://Charlie";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Ensure the query has no results yet.
         fluo.waitForObservers();
@@ -175,8 +184,12 @@ public class InputIT extends ITBase {
             ryaConn.add(triple);
         }
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Ensure Alice is a match.
         fluo.waitForObservers();
@@ -236,8 +249,12 @@ public class InputIT extends ITBase {
             ryaConn.add(triple);
         }
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Ensure Alice is a match.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index e83e977..4dba84b 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -26,7 +26,8 @@ import java.util.Set;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.impl.NumericLiteralImpl;
 import org.openrdf.model.impl.URIImpl;
@@ -78,8 +79,12 @@ public class QueryIT extends ITBase {
         expected.add( makeBindingSet(
                 new BindingImpl("person", new URIImpl("http://Charlie";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -160,8 +165,12 @@ public class QueryIT extends ITBase {
                 new BindingImpl("candidate", new URIImpl("http://Ivan";)),
                 new BindingImpl("leader", new URIImpl("http://Bob";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -221,8 +230,12 @@ public class QueryIT extends ITBase {
                 new BindingImpl("worker", new URIImpl("http://David";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -265,8 +278,12 @@ public class QueryIT extends ITBase {
                 new BindingImpl("name", new URIImpl("http://Charlie";)),
                 new BindingImpl("age", new NumericLiteralImpl(14, 
XMLSchema.INTEGER))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 873b78e..2ea2202 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -19,45 +19,28 @@
 package org.apache.rya.indexing.pcj.fluo.integration;
 
 import static org.junit.Assert.assertEquals;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
-import mvm.rya.api.domain.RyaStatement;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.BindingImpl;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
+import mvm.rya.api.domain.RyaStatement;
+
 /**
  * Performs integration tests over the Fluo application geared towards Rya PCJ 
exporting.
  * <p>
@@ -65,8 +48,6 @@ import com.google.common.collect.Sets;
  */
 public class RyaExportIT extends ITBase {
 
-       private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
-
     /**
      * Configure the export observer to use the Mini Accumulo instance as the
      * export destination for new PCJ results.
@@ -81,7 +62,7 @@ public class RyaExportIT extends ITBase {
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
         ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-
+        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
         return params;
     }
 
@@ -120,22 +101,26 @@ public class RyaExportIT extends ITBase {
                 makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
 
         // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expectedResults = new HashSet<>();
-        expectedResults.add( makeBindingSet(
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Bob";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
-        expectedResults.add( makeBindingSet(
+        expected.add(makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Charlie";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
-        expectedResults.add( makeBindingSet(
+        expected.add(makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://David";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -143,48 +128,10 @@ public class RyaExportIT extends ITBase {
         // Fetch the exported results from Accumulo once the observers finish 
working.
         fluo.waitForObservers();
 
-        // Fetch expcted results from the PCJ table that is in Accumulo.
-        final String exportTableName;
-        try(Snapshot snapshot = fluoClient.newSnapshot()) {
-            final Bytes queryId = snapshot.get(Bytes.of(sparql), 
FluoQueryColumns.QUERY_ID);
-            exportTableName = snapshot.get(queryId, 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString();
-        }
-
-        final Multimap<String, BindingSet> results = 
loadPcjResults(accumuloConn, exportTableName);
+        // Fetch expected results from the PCJ table that is in Accumulo.
+        final Set<BindingSet> results = Sets.newHashSet( 
pcjStorage.listResults(pcjId) );
 
         // Verify the end results of the query match the expected results.
-        final Multimap<String, BindingSet> expected = HashMultimap.create();
-        expected.putAll("customer;worker;city", expectedResults);
-        expected.putAll("worker;city;customer", expectedResults);
-        expected.putAll("city;customer;worker", expectedResults);
-
-        assertEquals(expected,  results);
-    }
-
-    /**
-     * Scan accumulo for the results that are stored in a PCJ table. The
-     * multimap stores a set of deserialized binding sets that were in the PCJ
-     * table for every variable order that is found in the PCJ metadata.
-     */
-    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName) throws PcjException, 
TableNotFoundException, BindingSetConversionException {
-        final Multimap<String, BindingSet> fetchedResults = 
HashMultimap.create();
-
-        // Get the variable orders the data was written to.
-        final PcjTables pcjs = new PcjTables();
-        final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
-
-        // Scan Accumulo for the stored results.
-        for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
-            final Scanner scanner = accumuloConn.createScanner(pcjTableName, 
new Authorizations());
-            scanner.fetchColumnFamily( new Text(varOrder.toString()) );
-
-            for(final Entry<Key, Value> entry : scanner) {
-                final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
-                final BindingSet result = converter.convert(serializedResult, 
varOrder);
-                fetchedResults.put(varOrder.toString(), result);
-            }
-        }
-
-        return fetchedResults;
+        assertEquals(expected, results);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index 68fd842..8cc7404 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -19,17 +19,14 @@
 package org.apache.rya.indexing.pcj.fluo.integration;
 
 import static org.junit.Assert.assertEquals;
-import io.fluo.api.client.FluoClient;
 
 import java.util.HashSet;
 import java.util.Set;
 
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.indexing.external.PrecomputedJoinIndexer;
-
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 import org.junit.Test;
 import org.openrdf.model.Statement;
@@ -40,6 +37,10 @@ import org.openrdf.repository.RepositoryConnection;
 
 import com.google.common.collect.Sets;
 
+import io.fluo.api.client.FluoClient;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
 
 /**
  * This test ensures that the correct updates are pushed by Fluo
@@ -84,9 +85,12 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Bob";))));
         expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Charlie";))));
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn,
-                new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -132,9 +136,13 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
             ryaConn.add(triple);
         }
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn,
-                new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+
         fluo.waitForObservers();
 
         // Load the streaming data into Rya.
@@ -177,9 +185,13 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
             ryaConn.add(triple);
         }
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn,
-                new HashSet<VariableOrder>(), sparql);
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Tell the Fluo app to maintain the PCJ.
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+
         fluo.waitForObservers();
 
         // Load the streaming data into Rya.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 95228b8..b23b4a4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -20,62 +20,38 @@ package org.apache.rya.indexing.pcj.fluo.visibility;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.SecurityOperations;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.hadoop.io.Text;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.junit.Test;
-import org.openrdf.model.impl.NumericLiteralImpl;
 import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.BindingImpl;
-import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.RepositoryException;
 
+import com.beust.jcommander.internal.Sets;
 import com.google.common.base.Optional;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+
+import mvm.rya.api.domain.RyaStatement;
 
 public class PcjVisibilityIT extends ITBase {
 
-    private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
     /**
      * Configure the export observer to use the Mini Accumulo instance as the
      * export destination for new PCJ results.
@@ -90,80 +66,11 @@ public class PcjVisibilityIT extends ITBase {
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
         ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-
+        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
         return params;
     }
 
     @Test
-    public void createWithVisibilityDirect() throws RepositoryException, 
PcjException, TableNotFoundException, RyaTypeResolverException, 
AccumuloException, AccumuloSecurityException, BindingSetConversionException {
-        // Create a PCJ table that will include those triples in its results.
-        final String sparql =
-                "SELECT ?name ?age " +
-                "{" +
-                  "FILTER(?age < 30) ." +
-                  "?name <http://hasAge> ?age." +
-                  "?name <http://playsSport> \"Soccer\" " +
-                "}";
-
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
-
-        // Create and populate the PCJ table.
-        final PcjTables pcjs = new PcjTables();
-        final PcjVarOrderFactory varOrderFactory = 
Optional.<PcjVarOrderFactory>absent().or(new ShiftVarOrderFactory());
-        final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( 
new VariableOrder(new String[]{"name", "age"}) );
-
-        // Create the PCJ table in Accumulo.
-        pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-        setupTestUsers(pcjTableName);
-
-        // Add a few results to the PCJ table.
-        final MapBindingSet alice = new MapBindingSet();
-        alice.addBinding("name", new URIImpl("http://Alice";));
-        alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
-        final MapBindingSet bob = new MapBindingSet();
-        bob.addBinding("name", new URIImpl("http://Bob";));
-        bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
-        final MapBindingSet charlie = new MapBindingSet();
-        charlie.addBinding("name", new URIImpl("http://Charlie";));
-        charlie.addBinding("age", new NumericLiteralImpl(12, 
XMLSchema.INTEGER));
-
-        final VisibilityBindingSet aliceVisibility = new 
VisibilityBindingSet(alice, "A&B&C");
-        final VisibilityBindingSet bobVisibility = new 
VisibilityBindingSet(bob, "B&C");
-        final VisibilityBindingSet charlieVisibility = new 
VisibilityBindingSet(charlie, "C");
-
-        final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, 
bob, charlie);
-        final Set<VisibilityBindingSet> visibilityResults = 
Sets.<VisibilityBindingSet>newHashSet(aliceVisibility, bobVisibility, 
charlieVisibility);
-        // Load historic matches from Rya into the PCJ table.
-        pcjs.addResults(accumuloConn, pcjTableName, visibilityResults);
-
-        // Make sure the cardinality was updated.
-        final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
-        assertEquals(3, metadata.getCardinality());
-
-        // Scan Accumulo for the stored results.
-        Multimap<String, BindingSet> fetchedResults = 
loadPcjResults(accumuloConn, pcjTableName, "A", "B", "C");
-        assertEquals(getExpectedResults(results), fetchedResults);
-        fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C", "B");
-        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(bob, 
charlie)), fetchedResults);
-        fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C");
-        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
-
-        final Connector cConn = 
accumuloConn.getInstance().getConnector("cUser", new PasswordToken("password"));
-        // Scan Accumulo for the stored results.
-        fetchedResults = loadPcjResults(cConn, pcjTableName, "C");
-        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
-        fetchedResults = loadPcjResults(cConn, pcjTableName);
-        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
-
-        final Connector noAuthConn = 
accumuloConn.getInstance().getConnector("noAuth", new 
PasswordToken("password"));
-        // Scan Accumulo for the stored results.
-        fetchedResults = loadPcjResults(noAuthConn, pcjTableName);
-        assertTrue(fetchedResults.isEmpty());
-    }
-
-    @Test
     public void createWithVisibilityFluo() throws Exception {
         final String sparql =
                 "SELECT ?customer ?worker ?city " +
@@ -195,8 +102,12 @@ public class PcjVisibilityIT extends ITBase {
         addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://livesIn";, "http://London";), "");
         addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://worksAt";, "http://Chipotle";), "");
 
+        // Create the PCJ Table in Accumulo.
+        final PrecomputedJoinStorage rootStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = rootStorage.createPcj(sparql);
+
         // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
ryaRepo);
 
         // Stream the data into Fluo.
         for(final RyaStatement statement : streamedTriples.keySet()) {
@@ -206,157 +117,121 @@ public class PcjVisibilityIT extends ITBase {
         // Fetch the exported results from Accumulo once the observers finish 
working.
         fluo.waitForObservers();
 
-        // Fetch expected results from the PCJ table that is in Accumulo.
-        final String exportTableName;
-        try(Snapshot snapshot = fluoClient.newSnapshot()) {
-            final Bytes queryId = snapshot.get(Bytes.of(sparql), 
FluoQueryColumns.QUERY_ID);
-            exportTableName = snapshot.get(queryId, 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString();
-        }
-        setupTestUsers(exportTableName);
-        Multimap<String, BindingSet> results = loadPcjResults(accumuloConn, 
exportTableName);
+        setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId);
+
+        // Verify ABCDE using root.
+        final Set<BindingSet> rootResults = toSet( 
rootStorage.listResults(pcjId));
 
-        // Verify the end results of the query match the expected results.
-        Multimap<String, BindingSet> expected = makeExpected(
-            makeBindingSet(
+        final Set<BindingSet> rootExpected = Sets.newHashSet();
+        rootExpected.add( makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))),
-            makeBindingSet(
+                new BindingImpl("city", new URIImpl("http://London";))));
+        rootExpected.add( makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Charlie";)),
-                new BindingImpl("city", new URIImpl("http://London";))),
-            makeBindingSet(
+                new BindingImpl("city", new URIImpl("http://London";))));
+        rootExpected.add( makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Eve";)),
-                new BindingImpl("city", new URIImpl("http://Leeds";))),
-            makeBindingSet(
+                new BindingImpl("city", new URIImpl("http://Leeds";))));
+        rootExpected.add( makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://David";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
 
-        assertEquals(expected,  results);
+        assertEquals(rootExpected, rootResults);
 
-        final PasswordToken pass = new PasswordToken("password");
-        Connector userConn = accumuloConn.getInstance().getConnector("abUser", 
pass);
-        results = loadPcjResults(userConn, exportTableName);
-        // Verify the end results of the query match the expected results.
-        expected = makeExpected(
-            makeBindingSet(
+        // Verify AB
+        final Connector abConn = cluster.getConnector("abUser", "password");
+        final PrecomputedJoinStorage abStorage = new 
AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME);
+        final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) 
);
+
+        final Set<BindingSet> abExpected = Sets.newHashSet();
+        abExpected.add( makeBindingSet(
                 new BindingImpl("customer", new URIImpl("http://Alice";)),
                 new BindingImpl("worker", new URIImpl("http://Bob";)),
                 new BindingImpl("city", new URIImpl("http://London";))));
-        assertEquals(expected,  results);
-
-        userConn = accumuloConn.getInstance().getConnector("abcUser", pass);
-        results = loadPcjResults(userConn, exportTableName);
-        expected = makeExpected(
-                makeBindingSet(
-                        new BindingImpl("customer", new 
URIImpl("http://Alice";)),
-                        new BindingImpl("worker", new URIImpl("http://Bob";)),
-                        new BindingImpl("city", new URIImpl("http://London";))),
-                makeBindingSet(
-                        new BindingImpl("customer", new 
URIImpl("http://Alice";)),
-                        new BindingImpl("worker", new 
URIImpl("http://Charlie";)),
-                        new BindingImpl("city", new 
URIImpl("http://London";))));
-        assertEquals(expected,  results);
-
-        userConn = accumuloConn.getInstance().getConnector("adeUser", pass);
-        results = loadPcjResults(userConn, exportTableName);
-        expected = makeExpected(
-                makeBindingSet(
-                    new BindingImpl("customer", new URIImpl("http://Alice";)),
-                    new BindingImpl("worker", new URIImpl("http://Eve";)),
-                    new BindingImpl("city", new URIImpl("http://Leeds";))));
-        assertEquals(expected,  results);
-
-        userConn = accumuloConn.getInstance().getConnector("noAuth", pass);
-        results = loadPcjResults(userConn, exportTableName);
-        assertTrue(results.isEmpty());
-    }
 
-    private Multimap<String, BindingSet> makeExpected(final BindingSet... 
bindingSets) {
-        final Set<BindingSet> expectedResults = new HashSet<>();
-        for(final BindingSet bs : bindingSets) {
-            expectedResults.add(bs);
-        }
+        assertEquals(abExpected, abResults);
+
+        // Verify ABC
+        final Connector abcConn = cluster.getConnector("abcUser", "password");
+        final PrecomputedJoinStorage abcStorage = new 
AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME);
+        final Set<BindingSet> abcResults = toSet( 
abcStorage.listResults(pcjId) );
+
+        final Set<BindingSet> abcExpected = Sets.newHashSet();
+        abcExpected.add(makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Bob";)),
+                new BindingImpl("city", new URIImpl("http://London";))));
+        abcExpected.add(makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Charlie";)),
+                new BindingImpl("city", new URIImpl("http://London";))));
+
+        assertEquals(abcExpected, abcResults);
+
+        // Verify ADE
+        final Connector adeConn = cluster.getConnector("adeUser", "password");
+        final PrecomputedJoinStorage adeStorage = new 
AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME);
+        final Set<BindingSet> adeResults = toSet( 
adeStorage.listResults(pcjId) );
 
-        final Multimap<String, BindingSet> expected = HashMultimap.create();
-        expected.putAll("customer;worker;city", expectedResults);
-        expected.putAll("worker;city;customer", expectedResults);
-        expected.putAll("city;customer;worker", expectedResults);
-        return expected;
+        final Set<BindingSet> adeExpected = Sets.newHashSet();
+        adeExpected.add(makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Eve";)),
+                new BindingImpl("city", new URIImpl("http://Leeds";))));
+
+        assertEquals(adeExpected, adeResults);
+
+        // Verify no auths.
+        final Connector noAuthConn = cluster.getConnector("noAuth", 
"password");
+        final PrecomputedJoinStorage noAuthStorage = new 
AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME);
+        final Set<BindingSet> noAuthResults = toSet( 
noAuthStorage.listResults(pcjId) );
+        assertTrue( noAuthResults.isEmpty() );
     }
 
-    private void setupTestUsers(final String exportTableName) throws 
AccumuloException, AccumuloSecurityException {
+    private void setupTestUsers(final Connector accumuloConn, final String 
ryaInstanceName, final String pcjId) throws AccumuloException, 
AccumuloSecurityException {
         final PasswordToken pass = new PasswordToken("password");
         final SecurityOperations secOps = accumuloConn.securityOperations();
+
+        // XXX We need the table name so that we can update security for the 
users.
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId);
+
+        // Give the 'roor' user authorizations to see everything.
         secOps.changeUserAuthorizations("root", new Authorizations("A", "B", 
"C", "D", "E"));
+
+        // Create a user that can see things with A and B.
         secOps.createLocalUser("abUser", pass);
         secOps.changeUserAuthorizations("abUser", new Authorizations("A", 
"B"));
-        secOps.grantTablePermission("abUser", exportTableName, 
TablePermission.READ);
+        secOps.grantTablePermission("abUser", pcjTableName, 
TablePermission.READ);
 
+        // Create a user that can see things with A, B, and C.
         secOps.createLocalUser("abcUser", pass);
         secOps.changeUserAuthorizations("abcUser", new Authorizations("A", 
"B", "C"));
-        secOps.grantTablePermission("abcUser", exportTableName, 
TablePermission.READ);
+        secOps.grantTablePermission("abcUser", pcjTableName, 
TablePermission.READ);
 
+        // Create a user that can see things with A, D, and E.
         secOps.createLocalUser("adeUser", pass);
         secOps.changeUserAuthorizations("adeUser", new Authorizations("A", 
"D", "E"));
-        secOps.grantTablePermission("adeUser", exportTableName, 
TablePermission.READ);
-
-        secOps.createLocalUser("cUser", pass);
-        secOps.changeUserAuthorizations("cUser", new Authorizations("C"));
-        secOps.grantTablePermission("cUser", exportTableName, 
TablePermission.READ);
+        secOps.grantTablePermission("adeUser", pcjTableName, 
TablePermission.READ);
 
+        // Create a user that can't see anything.
         secOps.createLocalUser("noAuth", pass);
         secOps.changeUserAuthorizations("noAuth", new Authorizations());
-        secOps.grantTablePermission("noAuth", exportTableName, 
TablePermission.READ);
-    }
-
-
-    private Multimap<String, BindingSet> getExpectedResults(final 
Set<BindingSet> results) {
-        final Multimap<String, BindingSet> expectedResults = 
HashMultimap.create();
-        expectedResults.putAll("name;age", results);
-        expectedResults.putAll("age;name", results);
-        return expectedResults;
+        secOps.grantTablePermission("noAuth", pcjTableName, 
TablePermission.READ);
     }
 
     protected static void addStatementVisibilityEntry(final Map<RyaStatement, 
String> triplesMap, final RyaStatement statement, final String visibility) {
         triplesMap.put(statement, visibility);
     }
 
-    /**
-     * Scan accumulo for the results that are stored in a PCJ table. The
-     * multimap stores a set of deserialized binding sets that were in the PCJ
-     * table for every variable order that is found in the PCJ metadata.
-     * @throws AccumuloSecurityException
-     * @throws AccumuloException
-     */
-    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName, final String... visibility) throws 
PcjException, TableNotFoundException, BindingSetConversionException, 
AccumuloException, AccumuloSecurityException {
-        final Multimap<String, BindingSet> fetchedResults = 
HashMultimap.create();
-
-        final Authorizations userAuths;
-        if(visibility.length == 0) {
-            userAuths = 
accumuloConn.securityOperations().getUserAuthorizations(accumuloConn.whoami());
-        } else {
-            userAuths = new Authorizations(visibility);
-        }
-
-        // Get the variable orders the data was written to.
-        final PcjTables pcjs = new PcjTables();
-        final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
-
-        // Scan Accumulo for the stored results.
-        for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
-            final Scanner scanner = accumuloConn.createScanner(pcjTableName, 
userAuths);
-            scanner.fetchColumnFamily( new Text(varOrder.toString()) );
-
-            for(final Entry<Key, Value> entry : scanner) {
-                final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
-                final BindingSet result = converter.convert(serializedResult, 
varOrder);
-                fetchedResults.put(varOrder.toString(), result);
-            }
+    private Set<BindingSet> toSet(final Iterable<BindingSet> bindingSets) {
+        final Set<BindingSet> set = new HashSet<>();
+        for(final BindingSet bindingSet : bindingSets) {
+            set.add( bindingSet );
         }
-
-        return fetchedResults;
+        return set;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java
----------------------------------------------------------------------
diff --git a/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java 
b/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java
index 6b47c0c..0af9f32 100644
--- a/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java
+++ b/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java
@@ -1,5 +1,13 @@
 package mvm.rya.rdftriplestore;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailBase;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -8,9 +16,9 @@ package mvm.rya.rdftriplestore;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,14 +38,6 @@ import mvm.rya.rdftriplestore.inference.InferenceEngine;
 import mvm.rya.rdftriplestore.namespace.NamespaceManager;
 import mvm.rya.rdftriplestore.provenance.ProvenanceCollector;
 
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.sail.SailConnection;
-import org.openrdf.sail.SailException;
-import org.openrdf.sail.helpers.SailBase;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class RdfCloudTripleStore extends SailBase {
 
     private RdfCloudTripleStoreConfiguration conf;
@@ -80,13 +80,6 @@ public class RdfCloudTripleStore extends SailBase {
             rdfEvalStatsDAO.init();
         }
 
-        //TODO: Support inferencing with ryadao
-//        if (inferenceEngine != null && !inferenceEngine.isInitialized()) {
-//            inferenceEngine.setConf(this.conf);
-//            inferenceEngine.setRyaDAO(ryaDAO);
-//            inferenceEngine.init();
-//        }
-
         if (namespaceManager == null) {
             this.namespaceManager = new NamespaceManager(ryaDAO, this.conf);
         }
@@ -94,6 +87,7 @@ public class RdfCloudTripleStore extends SailBase {
 
     @Override
     protected void shutDownInternal() throws SailException {
+
         try {
             if (namespaceManager != null) {
                 namespaceManager.shutdown();
@@ -135,11 +129,11 @@ public class RdfCloudTripleStore extends SailBase {
     public void setRdfEvalStatsDAO(RdfEvalStatsDAO rdfEvalStatsDAO) {
         this.rdfEvalStatsDAO = rdfEvalStatsDAO;
     }
-    
+
     public SelectivityEvalDAO getSelectEvalDAO() {
         return selectEvalDAO;
     }
-    
+
     public void setSelectEvalDAO(SelectivityEvalDAO selectEvalDAO) {
         this.selectEvalDAO = selectEvalDAO;
     }

Reply via email to