http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
new file mode 100644
index 0000000..74368ef
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import mvm.rya.api.instance.RyaDetailsUpdater;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import 
mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+/**
+ * An Accumulo backed implementation of {@link PrecomputedJoinStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloPcjStorage implements PrecomputedJoinStorage {
+
+    // Factories that are used to create new PCJs.
+    private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+    private final PcjTableNameFactory pcjTableNameFactory = new 
PcjTableNameFactory();
+    private final PcjVarOrderFactory pcjVarOrderFactory = new 
ShiftVarOrderFactory();
+
+    // Objects used to interact with the PCJ tables associated with an 
instance of Rya.
+    private final Connector accumuloConn;
+    private final String ryaInstanceName;
+    private final PcjTables pcjTables = new PcjTables();
+
+    // Used to update the instance's metadata.
+    private final RyaDetailsRepository ryaDetailsRepo;
+
+    /**
+     * Constructs an instance of {@link AccumuloPcjStorage}.
+     *
+     * @param accumuloConn - The connector that will be used to connect to  
Accumulo. (not null)
+     * @param ryaInstanceName - The name of the RYA instance that will be 
accessed. (not null)
+     */
+    public AccumuloPcjStorage(final Connector accumuloConn, final String 
ryaInstanceName) {
+        this.accumuloConn = requireNonNull(accumuloConn);
+        this.ryaInstanceName = requireNonNull(ryaInstanceName);
+        ryaDetailsRepo = new 
AccumuloRyaInstanceDetailsRepository(accumuloConn, ryaInstanceName);
+    }
+
+    @Override
+    public List<String> listPcjs() throws PCJStorageException {
+        try {
+            final RyaDetails details = ryaDetailsRepo.getRyaInstanceDetails();
+            final PCJIndexDetails pcjIndexDetails = 
details.getPCJIndexDetails();
+            final List<String> pcjIds = new ArrayList<>( 
pcjIndexDetails.getPCJDetails().keySet() );
+            return pcjIds;
+        } catch (final RyaDetailsRepositoryException e) {
+            throw new PCJStorageException("Could not check to see if 
RyaDetails exist for the instance.", e);
+        }
+    }
+
+    @Override
+    public String createPcj(final String sparql) throws PCJStorageException {
+        requireNonNull(sparql);
+
+        // Create the variable orders that will be used within Accumulo to 
store the PCJ.
+        final Set<VariableOrder> varOrders;
+        try {
+            varOrders = pcjVarOrderFactory.makeVarOrders(sparql);
+        } catch (final MalformedQueryException e) {
+            throw new PCJStorageException("Can not create the PCJ. The SPARQL 
is malformed.", e);
+        }
+
+        // Update the Rya Details for this instance to include the new PCJ 
table.
+        final String pcjId = pcjIdFactory.nextId();
+        try {
+            new RyaDetailsUpdater(ryaDetailsRepo).update(
+                    new RyaDetailsMutator() {
+                        @Override
+                        public RyaDetails mutate(final RyaDetails 
originalDetails) {
+                            // Create the new PCJ's details.
+                            final PCJDetails.Builder newPcjDetails = 
PCJDetails.builder().setId( pcjId );
+
+                            // Add them to the instance's details.
+                            final RyaDetails.Builder mutated = 
RyaDetails.builder(originalDetails);
+                            mutated.getPCJIndexDetails().addPCJDetails( 
newPcjDetails );
+                            return mutated.build();
+                        }
+                    });
+        } catch (final RyaDetailsRepositoryException | 
CouldNotApplyMutationException e) {
+            throw new PCJStorageException(String.format("Could not create a 
new PCJ for Rya instance '%s' " +
+                    "because of a problem while updating the instance's 
details.", ryaInstanceName), e);
+        }
+
+        // Create the table that will hold the PCJ's results.
+        final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+        pcjTables.createPcjTable(accumuloConn, pcjTableName, varOrders, 
sparql);
+        return pcjId;
+    }
+
+    @Override
+    public PcjMetadata getPcjMetadata(final String pcjId) throws 
PCJStorageException {
+        requireNonNull(pcjId);
+        final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+        return pcjTables.getPcjMetadata(accumuloConn, pcjTableName);
+    }
+
+    @Override
+    public void addResults(final String pcjId, final 
Collection<VisibilityBindingSet> results) throws PCJStorageException {
+        requireNonNull(pcjId);
+        requireNonNull(results);
+        final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+        pcjTables.addResults(accumuloConn, pcjTableName, results);
+    }
+
+    @Override
+    public Iterable<BindingSet> listResults(final String pcjId) throws 
PCJStorageException {
+        requireNonNull(pcjId);
+
+        try {
+            // Fetch my authorizations.
+            final String myUsername = accumuloConn.whoami();
+            final Authorizations myAuths = 
accumuloConn.securityOperations().getUserAuthorizations( myUsername );
+
+            // Scan the PCJ table.
+            final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+            return pcjTables.listResults(accumuloConn, pcjTableName, myAuths);
+
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new PCJStorageException("Could not list the results because 
I can not look up my Authorizations.", e);
+        }
+    }
+
+    @Override
+    public void purge(final String pcjId) throws PCJStorageException {
+        requireNonNull(pcjId);
+        final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+        pcjTables.purgePcjTable(accumuloConn, pcjTableName);
+    }
+
+    @Override
+    public void dropPcj(final String pcjId) throws PCJStorageException {
+        requireNonNull(pcjId);
+
+        // Update the Rya Details for this instance to no longer include the 
PCJ.
+        try {
+            new RyaDetailsUpdater(ryaDetailsRepo).update(
+                    new RyaDetailsMutator() {
+                        @Override
+                        public RyaDetails mutate(final RyaDetails 
originalDetails) {
+                            // Drop the PCJ's metadata from the instance's 
metadata.
+                            final RyaDetails.Builder mutated = 
RyaDetails.builder(originalDetails);
+                            
mutated.getPCJIndexDetails().removePCJDetails(pcjId);
+                            return mutated.build();
+                        }
+                    });
+        } catch (final RyaDetailsRepositoryException | 
CouldNotApplyMutationException e) {
+            throw new PCJStorageException(String.format("Could not drop an 
existing PCJ for Rya instance '%s' " +
+                    "because of a problem while updating the instance's 
details.", ryaInstanceName), e);
+        }
+
+        // Delete the table that hold's the PCJ's results.
+        final String pcjTableName = 
pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId);
+        pcjTables.dropPcjTable(accumuloConn, pcjTableName);
+    }
+
+    @Override
+    public void close() throws PCJStorageException {
+        // Accumulo Connectors don't require closing.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
index 248f724..588792b 100644
--- 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
@@ -20,12 +20,13 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.UUID;
+import javax.annotation.ParametersAreNonnullByDefault;
 
 /**
  * Creates Accumulo table names that may be recognized by Rya as a table that
  * holds the results of a Precomputed Join.
  */
+@ParametersAreNonnullByDefault
 public class PcjTableNameFactory {
 
     /**
@@ -46,30 +47,20 @@ public class PcjTableNameFactory {
      * query that is being precomputed. Here's an example of what a table name
      * may look like:
      * <pre>
-     *     demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9
+     *     demo_INDEX_c8f5367c16604210a7cb681ed004d2d9
      * </pre>
      * The "demo_INDEX" portion indicates this table is a PCJ table for the 
"demo_"
-     * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" 
portion
-     * could be anything at all that uniquely identifies the query that is 
being updated.
+     * instance of Rya. The "c8f5367c16604210a7cb681ed004d2d9" portion could be
+     * anything at all that uniquely identifies the query that is being 
updated.
      *
-     * @param tablePrefix - The Rya instance's table prefix. (not null)
-     * @param uniqueId - The unique portion of the Rya PCJ table name. (not 
null)
+     * @param ryaInstance - The Rya instance's table prefix. (not null)
+     * @param pcjId - The ID of the PCJ the table is for. (not null)
      * @return A Rya PCJ table name built using the provided values.
      */
-    public String makeTableName(final String tablePrefix, final String 
uniqueId) {
-        return tablePrefix + "INDEX_" + uniqueId;
-    }
-
-    /**
-     * Invokes {@link #makeTableName(String, String)} with a randomly generated
-     * UUID as the {@code uniqueId}.
-     *
-     * @param tablePrefix - The Rya instance's table prefix. (not null)
-     * @return A Rya PCJ table name built using the provided values.
-     */
-    public String makeTableName(final String tablePrefix) {
-        final String uniqueId = UUID.randomUUID().toString().replaceAll("-", 
"");
-        return makeTableName(tablePrefix, uniqueId);
+    public String makeTableName(final String ryaInstance, final String pcjId) {
+        requireNonNull(ryaInstance);
+        requireNonNull(pcjId);
+        return ryaInstance + "INDEX_" + pcjId.toString().replaceAll("-", "");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index 81078fa..c29cd2e 100644
--- 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.storage.accumulo;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -299,6 +300,42 @@ public class PcjTables {
     }
 
     /**
+     * Get an {@link Iterator} over the {@link BindingSet}s that are stored in 
the PCJ table.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hsots the PCJ 
table. (not null)
+     * @param pcjTableName - The name of the PCJ table that will be scanned. 
(not null)
+     * @param auths - the user's authorizations that will be used to scan the 
table. (not null)
+     * @return An iterator over all of the {@link BindingSet}s that are stored 
as
+     *   results for the PCJ.
+     * @throws PCJStorageException The binding sets could not be fetched.
+     */
+    public Iterable<BindingSet> listResults(final Connector accumuloConn, 
final String pcjTableName, final Authorizations auths) throws 
PCJStorageException {
+        requireNonNull(pcjTableName);
+
+        // Fetch the Variable Orders for the binding sets and choose one of 
them. It
+        // doesn't matter which one we choose because they all result in the 
same output.
+        final PcjMetadata metadata = getPcjMetadata(accumuloConn, 
pcjTableName);
+        final VariableOrder varOrder = 
metadata.getVarOrders().iterator().next();
+
+        try {
+            // Fetch only the Binding Sets whose Variable Order matches the 
selected one.
+            final Scanner scanner = accumuloConn.createScanner(pcjTableName, 
auths);
+            scanner.fetchColumnFamily( new Text(varOrder.toString()) );
+
+            // Return an Iterator that uses that scanner.
+            return new Iterable<BindingSet>() {
+                @Override
+                public Iterator<BindingSet> iterator() {
+                    return new ScannerBindingSetIterator(scanner, varOrder);
+                }
+            };
+
+        } catch (final TableNotFoundException e) {
+            throw new PCJStorageException(String.format("PCJ Table does not 
exist for name '%s'.", pcjTableName), e);
+        }
+    }
+
+    /**
      * Add a collection of results to a specific PCJ table.
      *
      * @param accumuloConn - A connection to the Accumulo that hosts the PCJ 
table. (not null)
@@ -383,119 +420,91 @@ public class PcjTables {
      * @param delta - How much the cardinality will change.
      * @throws PCJStorageException The cardinality could not be updated.
      */
-       private void updateCardinality(final Connector accumuloConn,
-                       final String pcjTableName, final long delta)
-                       throws PCJStorageException {
-               checkNotNull(accumuloConn);
-               checkNotNull(pcjTableName);
-
-               ConditionalWriter conditionalWriter = null;
-               try {
-                       conditionalWriter = 
accumuloConn.createConditionalWriter(
-                                       pcjTableName, new 
ConditionalWriterConfig());
-
-                       boolean updated = false;
-                       while (!updated) {
-                               // Write the conditional update request to 
Accumulo.
-                               final long cardinality = 
getPcjMetadata(accumuloConn,
-                                               pcjTableName).getCardinality();
-                               final ConditionalMutation mutation = 
makeUpdateCardinalityMutation(
-                                               cardinality, delta);
-                               final ConditionalWriter.Result result = 
conditionalWriter
-                                               .write(mutation);
-
-                               // Interpret the result.
-                               switch (result.getStatus()) {
-                               case ACCEPTED:
-                                       updated = true;
-                                       break;
-                               case REJECTED:
-                                       break;
-                               case UNKNOWN:
-                                       // We do not know if the mutation 
succeeded. At best, we
-                                       // can hope the metadata hasn't been 
updated
-                                       // since we originally fetched it and 
try again.
-                                       // Otherwise, continue forwards as if 
it worked. It's
-                                       // okay if this number is slightly off.
-                                       final long newCardinality = 
getPcjMetadata(accumuloConn,
-                                                       
pcjTableName).getCardinality();
-                                       if (newCardinality != cardinality) {
-                                               updated = true;
-                                       }
-                                       break;
-                               case VIOLATED:
-                                       throw new PCJStorageException(
-                                                       "The cardinality could 
not be updated because the commit violated a table constraint.");
-                               case INVISIBLE_VISIBILITY:
-                                       throw new PCJStorageException(
-                                                       "The condition contains 
a visibility the updater can not satisfy.");
-                               }
-                       }
-               } catch (AccumuloException | AccumuloSecurityException
-                               | TableNotFoundException e) {
-                       throw new PCJStorageException(
-                                       "Could not update the cardinality value 
of the PCJ Table named: "
-                                                       + pcjTableName, e);
-               } finally {
-                       if (conditionalWriter != null) {
-                               conditionalWriter.close();
-                       }
-               }
-
-       }
-
+    private void updateCardinality(final Connector accumuloConn, final String 
pcjTableName, final long delta) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
 
+        ConditionalWriter conditionalWriter = null;
+        try {
+            conditionalWriter = 
accumuloConn.createConditionalWriter(pcjTableName, new 
ConditionalWriterConfig());
+
+            boolean updated = false;
+            while (!updated) {
+                // Write the conditional update request to Accumulo.
+                final long cardinality = 
getPcjMetadata(accumuloConn,pcjTableName).getCardinality();
+                final ConditionalMutation mutation = 
makeUpdateCardinalityMutation(cardinality, delta);
+                final ConditionalWriter.Result result = 
conditionalWriter.write(mutation);
+
+                // Interpret the result.
+                switch (result.getStatus()) {
+                case ACCEPTED:
+                    updated = true;
+                    break;
+                case REJECTED:
+                    break;
+                case UNKNOWN:
+                    // We do not know if the mutation succeeded. At best, we
+                    // can hope the metadata hasn't been updated
+                    // since we originally fetched it and try again.
+                    // Otherwise, continue forwards as if it worked. It's
+                    // okay if this number is slightly off.
+                    final long newCardinality = 
getPcjMetadata(accumuloConn,pcjTableName).getCardinality();
+                    if (newCardinality != cardinality) {
+                        updated = true;
+                    }
+                    break;
+                case VIOLATED:
+                    throw new PCJStorageException("The cardinality could not 
be updated because the commit violated a table constraint.");
+                case INVISIBLE_VISIBILITY:
+                    throw new PCJStorageException("The condition contains a 
visibility the updater can not satisfy.");
+                }
+            }
+        } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+            throw new PCJStorageException("Could not update the cardinality 
value of the PCJ Table named: " + pcjTableName, e);
+        } finally {
+            if (conditionalWriter != null) {
+                conditionalWriter.close();
+            }
+        }
+    }
 
     /**
      * Update the cardinality of a PCJ by a {@code delta}.
      *
-     *This method updates the PCJ table cardinality using a BatchWriter in the 
event that
-     *the Accumulo Connector is for a MockInstance.  In the event that the 
cardinality is
-     *being updated asynchronously, there are no guarantees that the resulting 
cardinality
-     *will be correct.
+     * This method updates the PCJ table cardinality using a BatchWriter in 
the event that
+     * the Accumulo Connector is for a MockInstance.  In the event that the 
cardinality is
+     * being updated asynchronously, there are no guarantees that the 
resulting cardinality
+     * will be correct.
      *
      * @param accumuloConn - A connection to a Mock Accumulo Instance that 
hosts the PCJ table. (not null)
      * @param pcjTableName - The name of the PCJ table that will have its 
cardinality updated. (not null)
      * @param delta - How much the cardinality will change.
      * @throws PCJStorageException The cardinality could not be updated.
      */
-       private void updateMockCardinality(final Connector accumuloConn,
-                       final String pcjTableName, final long delta)
-                       throws PCJStorageException {
-               checkNotNull(accumuloConn);
-               checkNotNull(pcjTableName);
-
-               BatchWriter batchWriter = null;
-               try {
-                       batchWriter = 
accumuloConn.createBatchWriter(pcjTableName,
-                                       new BatchWriterConfig());
-                       final long cardinality = getPcjMetadata(accumuloConn, 
pcjTableName)
-                                       .getCardinality();
-                       final Mutation mutation = new 
Mutation(PCJ_METADATA_ROW_ID);
-                       final Value newCardinality = new Value(
-                                       longLexicoder.encode(cardinality + 
delta));
-                       mutation.put(PCJ_METADATA_FAMILY, 
PCJ_METADATA_CARDINALITY,
-                                       newCardinality);
-                       batchWriter.addMutation(mutation);
-               } catch (TableNotFoundException | MutationsRejectedException e) 
{
-                       throw new PCJStorageException(
-                                       "Could not update the cardinality value 
of the PCJ Table named: "
-                                                       + pcjTableName, e);
-               } finally {
-                       if (batchWriter != null) {
-                               try {
-                                       batchWriter.close();
-                               } catch (MutationsRejectedException e) {
-                                       throw new PCJStorageException(
-                                                       "Could not update the 
cardinality value of the PCJ Table named: "
-                                                                       + 
pcjTableName, e);
-                               }
-                       }
-               }
-       }
-
-
+    private void updateMockCardinality(final Connector accumuloConn, final 
String pcjTableName, final long delta) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
 
+        BatchWriter batchWriter = null;
+        try {
+            batchWriter = accumuloConn.createBatchWriter(pcjTableName, new 
BatchWriterConfig());
+            final long cardinality = getPcjMetadata(accumuloConn, 
pcjTableName).getCardinality();
+            final Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID);
+            final Value newCardinality = new 
Value(longLexicoder.encode(cardinality + delta));
+            mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, 
newCardinality);
+            batchWriter.addMutation(mutation);
+        } catch (TableNotFoundException | MutationsRejectedException e) {
+            throw new PCJStorageException("Could not update the cardinality 
value of the PCJ Table named: " + pcjTableName, e);
+        } finally {
+            if (batchWriter != null) {
+                try {
+                    batchWriter.close();
+                } catch (final MutationsRejectedException e) {
+                    throw new PCJStorageException("Could not update the 
cardinality value of the PCJ Table named: " + pcjTableName, e);
+                }
+            }
+        }
+    }
 
     /**
      * Creates a {@link ConditionalMutation} that only updates the cardinality

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
index f806d6e..00b4c99 100644
--- 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
@@ -20,13 +20,27 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
 
 import java.util.Set;
 
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.MalformedQueryException;
+
 /**
  * Create alternative variable orders for a SPARQL query based on
  * the original ordering of its results.
  */
+@ParametersAreNonnullByDefault
 public interface PcjVarOrderFactory {
 
     /**
+     * Create a set of variable orders for a SPARQL query.
+     *
+     * @param sparql - The SPARQL query the variable orders will be derived 
from. (not null)
+     * @return @return A set of variable orders for the SPARQL query.
+     * @throws MalformedQueryException The SPARQL query was malformed and 
could not be parsed.
+     */
+    public Set<VariableOrder> makeVarOrders(String sparql) throws 
MalformedQueryException;
+
+    /**
      * Create alternative variable orders for a SPARQL query based on
      * the original ordering of its results.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
new file mode 100644
index 0000000..b641070
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Iterates over the results of a {@link Scanner} assuming the results are
+ * binding sets that can be converted using a {@link AccumuloPcjSerializer}.
+ */
+@ParametersAreNonnullByDefault
+public class ScannerBindingSetIterator implements Iterator<BindingSet> {
+
+    private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+
+    private final Iterator<Entry<Key, Value>> accEntries;
+    private final VariableOrder varOrder;
+
+    /**
+     * Constructs an instance of {@link ScannerBindingSetIterator}.
+     *
+     * @param scanner - The scanner whose results will be iterated over. (not 
null)
+     * @param varOrder - The variable order of the binding sets the scanner 
returns. (not null)
+     */
+    public ScannerBindingSetIterator(final Scanner scanner, final 
VariableOrder varOrder) {
+        requireNonNull(scanner);
+        this.accEntries = scanner.iterator();
+        this.varOrder = requireNonNull(varOrder);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return accEntries.hasNext();
+    }
+
+    @Override
+    public BindingSet next() {
+        final Entry<Key, Value> entry = accEntries.next();
+        final byte[] bindingSetBytes = entry.getKey().getRow().getBytes();
+        try {
+            return converter.convert(bindingSetBytes, varOrder);
+        } catch (final BindingSetConversionException e) {
+            throw new RuntimeException("Could not deserialize a BindingSet 
from Accumulo.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
index 1ae21e5..b4ba348 100644
--- 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
@@ -18,12 +18,17 @@
  */
 package org.apache.rya.indexing.pcj.storage.accumulo;
 
+import static java.util.Objects.requireNonNull;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -32,6 +37,18 @@ import com.google.common.collect.Lists;
  */
 @ParametersAreNonnullByDefault
 public class ShiftVarOrderFactory implements PcjVarOrderFactory {
+
+    @Override
+    public Set<VariableOrder> makeVarOrders(final String sparql) throws 
MalformedQueryException {
+        requireNonNull(sparql);
+
+        final Set<String> bindingNames = new SPARQLParser().parseQuery(sparql, 
null)
+                .getTupleExpr()
+                .getBindingNames();
+
+        return makeVarOrders( new VariableOrder(bindingNames) );
+    }
+
     @Override
     public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) {
         final Set<VariableOrder> varOrders = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java
new file mode 100644
index 0000000..2ad5ce5
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Tests the methods of {@link PcjTableNameFactory}.
+ */
+public class PcjTableNameFactoryTest {
+
+    @Test
+    public void makeTableName() {
+        final String ryaInstance = "testInstance_";
+        final String pcjId = "2dda1b099d264f16b1da8f9409c104d3";
+
+        // Create the Accumulo PCJ table name.
+        final PcjTableNameFactory factory = new PcjTableNameFactory();
+        final String tableName = factory.makeTableName(ryaInstance, pcjId);
+
+        // Ensure the table name matches the expected name.
+        assertEquals("testInstance_INDEX_2dda1b099d264f16b1da8f9409c104d3", 
tableName);
+    }
+
+    @Test
+    public void getPcjId() {
+        final String pcjTableName = 
"testInstance_INDEX_2dda1b099d264f16b1da8f9409c104d3";
+
+        // Get the PCJ ID from the table name.
+        final PcjTableNameFactory factory = new PcjTableNameFactory();
+        final String pcjId = factory.getPcjId(pcjTableName);
+
+        // Ensure the pcjId matches the expected id.
+        assertEquals("2dda1b099d264f16b1da8f9409c104d3", pcjId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
index 68dd3df..bc93a05 100644
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
@@ -1,6 +1,4 @@
-package org.apache.rya.indexing.pcj.storage.accumulo;
-
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -9,7 +7,7 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -18,43 +16,36 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.rya.indexing.pcj.storage.accumulo;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.rdftriplestore.RdfCloudTripleStore;
-import mvm.rya.rdftriplestore.RyaSailRepository;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.zookeeper.ClientCnxn;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.impl.LiteralImpl;
@@ -71,14 +62,19 @@ import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.io.Files;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.MiniAccumuloClusterInstance;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.RyaSailRepository;
 
 /**
  * Performs integration test using {@link MiniAccumuloCluster} to ensure the
  * functions of {@link PcjTables} work within a cluster setting.
  */
 public class PcjTablesIntegrationTest {
-    private static final Logger log = 
Logger.getLogger(PcjTablesIntegrationTest.class);
 
     private static final String USE_MOCK_INSTANCE = ".useMockInstance";
     private static final String CLOUDBASE_INSTANCE = 
"sc.cloudbase.instancename";
@@ -89,22 +85,69 @@ public class PcjTablesIntegrationTest {
 
     protected static final String RYA_TABLE_PREFIX = "demo_";
 
+    // The MiniAccumuloCluster is re-used between tests.
+    private MiniAccumuloClusterInstance cluster;
+
     // Rya data store and connections.
-    protected MiniAccumuloCluster accumulo = null;
-    protected static Connector accumuloConn = null;
     protected RyaSailRepository ryaRepo = null;
     protected RepositoryConnection ryaConn = null;
 
+    @BeforeClass
+    public static void killLoudLogs() {
+        Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
+    }
+
     @Before
-    public void setupMiniResources() throws IOException, InterruptedException, 
AccumuloException, AccumuloSecurityException, RepositoryException {
-        // Initialize the Mini Accumulo that will be used to store Triples and 
get a connection to it.
-        accumulo = startMiniAccumulo();
+    public void resetTestEnvironmanet() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, RepositoryException, 
IOException, InterruptedException {
+        // Start the cluster.
+        cluster = new MiniAccumuloClusterInstance();
+        cluster.startMiniAccumulo();
 
         // Setup the Rya library to use the Mini Accumulo.
-        ryaRepo = setupRya(accumulo);
+        ryaRepo = setupRya();
         ryaConn = ryaRepo.getConnection();
     }
 
+    @After
+    public void shutdownMiniCluster() throws IOException, 
InterruptedException, RepositoryException {
+        // Stop Rya.
+        ryaRepo.shutDown();
+
+        // Stop the cluster.
+        cluster.stopMiniAccumulo();
+    }
+
+    /**
+     * Format a Mini Accumulo to be a Rya repository.
+     *
+     * @return The Rya repository sitting on top of the Mini Accumulo.
+     */
+    private RyaSailRepository setupRya() throws AccumuloException, 
AccumuloSecurityException, RepositoryException {
+        // Setup the Rya Repository that will be used to create Repository 
Connections.
+        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+        final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
+        crdfdao.setConnector( cluster.getConnector() );
+
+        // Setup Rya configuration values.
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_TABLE_PREFIX);
+        conf.setDisplayQueryPlan(true);
+
+        conf.setBoolean(USE_MOCK_INSTANCE, false);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
RYA_TABLE_PREFIX);
+        conf.set(CLOUDBASE_USER, cluster.getUsername());
+        conf.set(CLOUDBASE_PASSWORD, cluster.getPassword());
+        conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName());
+
+        crdfdao.setConf(conf);
+        ryaStore.setRyaDAO(crdfdao);
+
+        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+        ryaRepo.initialize();
+
+        return ryaRepo;
+    }
+
     /**
      * Ensure that when a new PCJ table is created, it is initialized with the
      * correct metadata values.
@@ -112,7 +155,7 @@ public class PcjTablesIntegrationTest {
      * The method being tested is {@link PcjTables#createPcjTable(Connector, 
String, Set, String)}
      */
     @Test
-    public void createPcjTable() throws PcjException {
+    public void createPcjTable() throws PcjException, AccumuloException, 
AccumuloSecurityException {
         final String sparql =
                 "SELECT ?name ?age " +
                 "{" +
@@ -121,6 +164,8 @@ public class PcjTablesIntegrationTest {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
+        final Connector accumuloConn = cluster.getConnector();
+
         // Create a PCJ table in the Mini Accumulo.
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
@@ -141,7 +186,7 @@ public class PcjTablesIntegrationTest {
      * The method being tested is {@link PcjTables#addResults(Connector, 
String, java.util.Collection)}
      */
     @Test
-    public void addResults() throws PcjException, TableNotFoundException, 
BindingSetConversionException {
+    public void addResults() throws PcjException, TableNotFoundException, 
BindingSetConversionException, AccumuloException, AccumuloSecurityException {
         final String sparql =
                 "SELECT ?name ?age " +
                 "{" +
@@ -150,6 +195,8 @@ public class PcjTablesIntegrationTest {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
+        final Connector accumuloConn = cluster.getConnector();
+
         // Create a PCJ table in the Mini Accumulo.
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
@@ -189,6 +236,53 @@ public class PcjTablesIntegrationTest {
         assertEquals(expectedResults, fetchedResults);
     }
 
+    @Test
+    public void listResults() throws PCJStorageException, AccumuloException, 
AccumuloSecurityException {
+        final String sparql =
+                "SELECT ?name ?age " +
+                "{" +
+                  "FILTER(?age < 30) ." +
+                  "?name <http://hasAge> ?age." +
+                  "?name <http://playsSport> \"Soccer\" " +
+                "}";
+
+        final Connector accumuloConn = cluster.getConnector();
+
+        // Create a PCJ table in the Mini Accumulo.
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+        final PcjTables pcjs = new PcjTables();
+        pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+        // Add a few results to the PCJ table.
+        final MapBindingSet alice = new MapBindingSet();
+        alice.addBinding("name", new URIImpl("http://Alice";));
+        alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+        final MapBindingSet bob = new MapBindingSet();
+        bob.addBinding("name", new URIImpl("http://Bob";));
+        bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+        final MapBindingSet charlie = new MapBindingSet();
+        charlie.addBinding("name", new URIImpl("http://Charlie";));
+        charlie.addBinding("age", new NumericLiteralImpl(12, 
XMLSchema.INTEGER));
+
+        pcjs.addResults(accumuloConn, pcjTableName, 
Sets.<VisibilityBindingSet>newHashSet(
+                new VisibilityBindingSet(alice),
+                new VisibilityBindingSet(bob),
+                new VisibilityBindingSet(charlie)));
+
+        // Fetch the Binding Sets that have been stored in the PCJ table.
+        final Set<BindingSet> results = new HashSet<>();
+        for(final BindingSet result : pcjs.listResults(accumuloConn, 
pcjTableName, new Authorizations())) {
+            results.add( result );
+        }
+
+        // Verify the fetched results match the expected ones.
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, 
bob, charlie);
+        assertEquals(expected, results);
+    }
+
     /**
      * Ensure when results are already stored in Rya, that we are able to 
populate
      * the PCJ table for a new SPARQL query using those results.
@@ -196,7 +290,7 @@ public class PcjTablesIntegrationTest {
      * The method being tested is: {@link PcjTables#populatePcj(Connector, 
String, RepositoryConnection, String)}
      */
     @Test
-    public void populatePcj() throws RepositoryException, PcjException, 
TableNotFoundException, BindingSetConversionException {
+    public void populatePcj() throws RepositoryException, PcjException, 
TableNotFoundException, BindingSetConversionException, AccumuloException, 
AccumuloSecurityException {
         // Load some Triples into Rya.
         final Set<Statement> triples = new HashSet<>();
         triples.add( new StatementImpl(new URIImpl("http://Alice";), new 
URIImpl("http://hasAge";), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
@@ -221,6 +315,8 @@ public class PcjTablesIntegrationTest {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
+        final Connector accumuloConn = cluster.getConnector();
+
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
@@ -264,7 +360,7 @@ public class PcjTablesIntegrationTest {
      * The method being tested is: {@link 
PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, 
String[], Optional)}
      */
     @Test
-    public void createAndPopulatePcj() throws RepositoryException, 
PcjException, TableNotFoundException, BindingSetConversionException {
+    public void createAndPopulatePcj() throws RepositoryException, 
PcjException, TableNotFoundException, BindingSetConversionException, 
AccumuloException, AccumuloSecurityException {
         // Load some Triples into Rya.
         final Set<Statement> triples = new HashSet<>();
         triples.add( new StatementImpl(new URIImpl("http://Alice";), new 
URIImpl("http://hasAge";), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
@@ -289,6 +385,8 @@ public class PcjTablesIntegrationTest {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
+        final Connector accumuloConn = cluster.getConnector();
+
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
 
         // Create and populate the PCJ table.
@@ -325,7 +423,9 @@ public class PcjTablesIntegrationTest {
     }
 
     @Test
-    public void listPcjs() throws PCJStorageException {
+    public void listPcjs() throws PCJStorageException, AccumuloException, 
AccumuloSecurityException {
+        final Connector accumuloConn = cluster.getConnector();
+
         // Set up the table names that will be used.
         final String instance1 = "instance1_";
         final String instance2 = "instance2_";
@@ -358,7 +458,7 @@ public class PcjTablesIntegrationTest {
     }
 
     @Test
-    public void purge() throws PCJStorageException {
+    public void purge() throws PCJStorageException, AccumuloException, 
AccumuloSecurityException {
         final String sparql =
                 "SELECT ?name ?age " +
                 "{" +
@@ -367,6 +467,8 @@ public class PcjTablesIntegrationTest {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
+        final Connector accumuloConn = cluster.getConnector();
+
         // Create a PCJ table in the Mini Accumulo.
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
@@ -404,7 +506,9 @@ public class PcjTablesIntegrationTest {
     }
 
     @Test
-    public void dropPcj() throws PCJStorageException {
+    public void dropPcj() throws PCJStorageException, AccumuloException, 
AccumuloSecurityException {
+        final Connector accumuloConn = cluster.getConnector();
+
         // Create a PCJ index.
         final String tableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "thePcj");
         final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( 
new VariableOrder("x") );
@@ -457,90 +561,4 @@ public class PcjTablesIntegrationTest {
 
         return fetchedResults;
     }
-
-    @After
-    public void shutdownMiniResources() {
-        if(ryaConn != null) {
-            try {
-                log.info("Shutting down Rya Connection.");
-                ryaConn.close();
-                log.info("Rya Connection shut down.");
-            } catch(final Exception e) {
-                log.error("Could not shut down the Rya Connection.", e);
-            }
-        }
-
-        if(ryaRepo != null) {
-            try {
-                log.info("Shutting down Rya Repo.");
-                ryaRepo.shutDown();
-                log.info("Rya Repo shut down.");
-            } catch(final Exception e) {
-                log.error("Could not shut down the Rya Repo.", e);
-            }
-        }
-
-        if(accumulo != null) {
-            try {
-                log.info("Shutting down the Mini Accumulo being used as a Rya 
store.");
-                accumulo.stop();
-                log.info("Mini Accumulo being used as a Rya store shut down.");
-            } catch(final Exception e) {
-                log.error("Could not shut down the Mini Accumulo.", e);
-            }
-        }
-    }
-
-    /**
-     * Setup a Mini Accumulo cluster that uses a temporary directory to store 
its data.
-     *
-     * @return A Mini Accumulo cluster.
-     */
-    private static MiniAccumuloCluster startMiniAccumulo() throws IOException, 
InterruptedException, AccumuloException, AccumuloSecurityException {
-        final File miniDataDir = Files.createTempDir();
-
-        // Setup and start the Mini Accumulo.
-        final MiniAccumuloCluster accumulo = new 
MiniAccumuloCluster(miniDataDir, "password");
-        accumulo.start();
-
-        // Store a connector to the Mini Accumulo.
-        final Instance instance = new 
ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
-        accumuloConn = instance.getConnector("root", new 
PasswordToken("password"));
-
-        return accumulo;
-    }
-
-    /**
-     * Format a Mini Accumulo to be a Rya repository.
-     *
-     * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. 
(not null)
-     * @return The Rya repository sitting on top of the Mini Accumulo.
-     */
-    private static RyaSailRepository setupRya(final MiniAccumuloCluster 
accumulo) throws AccumuloException, AccumuloSecurityException, 
RepositoryException {
-        checkNotNull(accumulo);
-
-        // Setup the Rya Repository that will be used to create Repository 
Connections.
-        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
-        final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
-        crdfdao.setConnector(accumuloConn);
-
-        // Setup Rya configuration values.
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("demo_");
-        conf.setDisplayQueryPlan(true);
-
-        conf.setBoolean(USE_MOCK_INSTANCE, true);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
RYA_TABLE_PREFIX);
-        conf.set(CLOUDBASE_USER, "root");
-        conf.set(CLOUDBASE_PASSWORD, "password");
-        conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName());
-
-        crdfdao.setConf(conf);
-        ryaStore.setRyaDAO(crdfdao);
-
-        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
-        ryaRepo.initialize();
-
-        return ryaRepo;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java
new file mode 100644
index 0000000..f82b735
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+import com.beust.jcommander.internal.Sets;
+
+/**
+ * Tests the methods of {@link ShiftVarOrderFactory}.
+ */
+public class ShiftVarOrderFactoryTest {
+
+    @Test
+    public void makeVarOrders_fromSPARQL() throws MalformedQueryException {
+        // The SPARQL whose PCJ var orders will be generated from.
+        final String sparql =
+                "SELECT ?a ?b ?c " +
+                "WHERE { " +
+                    "?a <http://talksTo> ?b. " +
+                    "?b <http://worksAt> ?c. " +
+                "}";
+
+        // Run the test.
+        final PcjVarOrderFactory factory = new ShiftVarOrderFactory();
+        final Set<VariableOrder> varOrders = factory.makeVarOrders(sparql);
+
+        // Ensure the returned set matches the expected results.
+        final Set<VariableOrder> expected = Sets.newHashSet();
+        expected.add( new VariableOrder("a", "b", "c") );
+        expected.add( new VariableOrder("c", "a", "b") );
+        expected.add( new VariableOrder("b", "c", "a") );
+        assertEquals(expected, varOrders);
+    }
+
+    @Test
+    public void makeVarOrders_fromVarOrder() {
+        // The VariableOrder whose PCJ var orders will be generated from.
+        final VariableOrder varOrder = new VariableOrder("a", "b", "c");
+
+        // Run the test.
+        final PcjVarOrderFactory factory = new ShiftVarOrderFactory();
+        final Set<VariableOrder> varOrders = factory.makeVarOrders(varOrder);
+
+        // Ensure the returned set matches the expected results.
+        final Set<VariableOrder> expected = Sets.newHashSet();
+        expected.add( new VariableOrder("a", "b", "c") );
+        expected.add( new VariableOrder("c", "a", "b") );
+        expected.add( new VariableOrder("b", "c", "a") );
+        assertEquals(expected, varOrders);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
new file mode 100644
index 0000000..7474207
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+import mvm.rya.accumulo.AccumuloRyaITBase;
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+
+/**
+ * Integration tests the methods of {@link AccumuloPcjStorage}.
+ * </p>
+ * These tests ensures that the PCJ tables are maintained and that these 
operations
+ * also update the Rya instance's details.
+ */
+public class AccumuloPcjStorageIT extends AccumuloRyaITBase {
+
+    @Test
+    public void createPCJ() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException, NotInitializedException, 
RyaDetailsRepositoryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
+
+        // Ensure the Rya details have been updated to include the PCJ's ID.
+        final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+        final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
+                .getPCJIndexDetails()
+                .getPCJDetails();
+
+        final PCJDetails expectedDetails = PCJDetails.builder()
+                .setId( pcjId )
+                .build();
+
+        assertEquals(expectedDetails, detailsMap.get(pcjId));
+    }
+
+    @Test
+    public void dropPCJ() throws AccumuloException, AccumuloSecurityException, 
PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
+
+        // Delete the PCJ that was just created.
+        pcjStorage.dropPcj(pcjId);
+
+        // Ensure the Rya details have been updated to no longer include the 
PCJ's ID.
+        final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+        final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
+                .getPCJIndexDetails()
+                .getPCJDetails();
+
+        assertFalse( detailsMap.containsKey(pcjId) );
+    }
+
+    @Test
+    public void listPcjs() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a few PCJs and hold onto their IDs.
+        final List<String> expectedIds = new ArrayList<>();
+
+        String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> 
?b } ");
+        expectedIds.add( pcjId );
+
+        pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+        expectedIds.add( pcjId );
+
+        pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+        expectedIds.add( pcjId );
+
+        // Fetch the PCJ names
+        final List<String> pcjIds = pcjStorage.listPcjs();
+
+        // Ensure the expected IDs match the fetched IDs.
+        Collections.sort(expectedIds);
+        Collections.sort(pcjIds);
+        assertEquals(expectedIds, pcjIds);
+    }
+
+    @Test
+    public void getPcjMetadata() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Fetch the PCJ's metadata.
+        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+        // Ensure it has the expected values.
+        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
+        assertEquals(expectedMetadata, metadata);
+    }
+
+    @Test
+    public void addResults() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Add some binding sets to it.
+        final Set<VisibilityBindingSet> results = new HashSet<>();
+
+        final MapBindingSet aliceBS = new MapBindingSet();
+        aliceBS.addBinding("a", new URIImpl("http://Alice";));
+        aliceBS.addBinding("b", new URIImpl("http://Person";));
+        results.add( new VisibilityBindingSet(aliceBS, "") );
+
+        final MapBindingSet charlieBS = new MapBindingSet();
+        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+        results.add( new VisibilityBindingSet(charlieBS, "") );
+
+        pcjStorage.addResults(pcjId, results);
+
+        // Make sure the PCJ metadata was updated.
+        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, 
varOrders);
+        assertEquals(expectedMetadata, metadata);
+    }
+
+    @Test
+    public void listResults() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Add some binding sets to it.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        final MapBindingSet aliceBS = new MapBindingSet();
+        aliceBS.addBinding("a", new URIImpl("http://Alice";));
+        aliceBS.addBinding("b", new URIImpl("http://Person";));
+        expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+        final MapBindingSet charlieBS = new MapBindingSet();
+        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+        expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+        pcjStorage.addResults(pcjId, expectedResults);
+
+        // List the results that were stored.
+        final Set<BindingSet> results = new HashSet<>();
+        for(final BindingSet result : pcjStorage.listResults(pcjId)) {
+            results.add( result );
+        }
+
+        assertEquals(expectedResults, results);
+    }
+
+    @Test
+    public void purge() throws AccumuloException, AccumuloSecurityException, 
PCJStorageException, MalformedQueryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+
+        // Create a PCJ.
+        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        // Add some binding sets to it.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        final MapBindingSet aliceBS = new MapBindingSet();
+        aliceBS.addBinding("a", new URIImpl("http://Alice";));
+        aliceBS.addBinding("b", new URIImpl("http://Person";));
+        expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+        final MapBindingSet charlieBS = new MapBindingSet();
+        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+        expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+        pcjStorage.addResults(pcjId, expectedResults);
+
+        // Purge the PCJ.
+        pcjStorage.purge(pcjId);
+
+        // List the results that were stored.
+        final Set<BindingSet> results = new HashSet<>();
+        for(final BindingSet result : pcjStorage.listResults(pcjId)) {
+            results.add( result );
+        }
+
+        assertTrue( results.isEmpty() );
+
+        // Make sure the PCJ metadata was updated.
+        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
+        assertEquals(expectedMetadata, metadata);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 0a4885c..29d328d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.api;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
 import java.util.HashSet;
@@ -27,7 +28,6 @@ import java.util.Set;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.accumulo.core.client.Connector;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -37,10 +37,9 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
@@ -50,13 +49,13 @@ import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.impl.MapBindingSet;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.repository.sail.SailRepository;
 import org.openrdf.sail.SailConnection;
 import org.openrdf.sail.SailException;
 
 import info.aduna.iteration.CloseableIteration;
 import io.fluo.api.client.FluoClient;
 import io.fluo.api.types.TypedTransaction;
-import mvm.rya.rdftriplestore.RyaSailRepository;
 
 /**
  * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
@@ -87,11 +86,6 @@ public class CreatePcj {
     private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
 
     /**
-     * A utility used to interact with Rya's PCJ tables.
-     */
-    private static final PcjTables PCJ_TABLES = new PcjTables();
-
-    /**
      * The maximum number of binding sets that will be inserted into each 
Statement
      * Pattern's result set per Fluo transaction.
      */
@@ -117,80 +111,58 @@ public class CreatePcj {
     }
 
     /**
-     * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. 
Historic
-     * triples will be scanned and matched using the rya connection that was
-     * provided. The PCJ will also automatically export to a table in Accumulo
-     * named using the {@code ryaTablePrefix} and the query's ID from the Fluo 
table.
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * <p>
+     * This call scans Rya for Statement Pattern matches and inserts them into
+     * the Fluo application. The Fluo application will then maintain the 
intermediate
+     * results as new triples are inserted and export any new query results to 
the
+     * {@code pcjId} within the provided {@code pcjStorage}.
      *
-     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
-     * @param ryaTablePrefix - The prefix that will be prepended to the 
Accumulo table
-     *   the PCJ's results will be exported to. (not null)
-     * @param rya - A connection to the Rya repository that will be scanned. 
(not null)
-     * @param accumuloConn - A connectino to the Accumulo instance the 
incremental
-     *   results will be exported to as a Rya PCJ table. (not null)
-     * @param varOrders - The variable orders the query's results will be 
exported to
-     *   within the export table. If this set is empty, then a default will be
-     *   used instead.(not null)
-     * @param sparql - The SPARQL query whose results will be incrementally 
updated by Fluo. (not null)
-     * @throws MalformedQueryException The PCJ could not be initialized 
because the SPARQL query was malformed.
-     * @throws PcjException The PCJ could not be initialized because of a 
problem setting up the export location.
-     * @throws SailException Historic results could not be added to the 
initialized PCJ because of
-     *   a problem with the Rya connection.
-     * @throws QueryEvaluationException Historic results could not be added to 
the initialized PCJ because of
-     *   a problem with the Rya connection.
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
+     * @param rya - A connection to the Rya instance hosting the PCJ, (not 
null)
+     *
+     * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
+     * @throws SailException Historic PCJ results could not be loaded because 
of a problem with {@code rya}.
+     * @throws QueryEvaluationException Historic PCJ results could not be 
loaded because of a problem with {@code rya}.
      */
     public void withRyaIntegration(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
             final FluoClient fluo,
-            final String ryaTablePrefix,
-            final RyaSailRepository rya,
-            final Connector accumuloConn,
-            final Set<VariableOrder> varOrders,
-            final String sparql) throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException {
-        checkNotNull(fluo);
-        checkNotNull(ryaTablePrefix);
-        checkNotNull(rya);
-        checkNotNull(accumuloConn);
-        checkNotNull(varOrders);
-        checkNotNull(sparql);
-
-        // Parse the SPARQL into a POJO.
-        final SPARQLParser parser = new SPARQLParser();
-        final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+            final SailRepository rya)
+                    throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+        requireNonNull(rya);
 
         // Keeps track of the IDs that are assigned to each of the query's 
nodes in Fluo.
         // We use these IDs later when scanning Rya for historic Statement 
Pattern matches
         // as well as setting up automatic exports.
         final NodeIds nodeIds = new NodeIds();
-        final String exportTableName;
-        final String queryId;
 
         // Parse the query's structure for the metadata that will be written 
to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, 
null);
         final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
 
         try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( 
fluo.newTransaction() )) {
             // Write the query's structure to Fluo.
             new FluoQueryMetadataDAO().write(tx, fluoQuery);
 
-            // Since we are exporting the query's results to a table in 
Accumulo, store that location in the fluo table.
-            queryId = fluoQuery.getQueryMetadata().getNodeId();
-
-            exportTableName = new 
PcjTableNameFactory().makeTableName(ryaTablePrefix, queryId);
-            
tx.mutate().row(queryId).col(FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).set(exportTableName);
+            // The results of the query are eventually exported to an instance 
of Rya, so store the Rya ID for the PCJ.
+            final String queryId = fluoQuery.getQueryMetadata().getNodeId();
+            
tx.mutate().row(queryId).col(FluoQueryColumns.RYA_PCJ_ID).set(pcjId);
+            
tx.mutate().row(pcjId).col(FluoQueryColumns.PCJ_ID_QUERY_ID).set(queryId);
 
             // Flush the changes to Fluo.
             tx.commit();
         }
 
-        // Initialize the export destination in Accumulo. If triples are being 
written to Fluo
-        // while this query is being created, then the export observer may 
throw errors for a while
-        // until this step is completed.
-        final VariableOrder queryVarOrder = 
fluoQuery.getQueryMetadata().getVariableOrder();
-        if(varOrders.isEmpty()) {
-            final Set<VariableOrder> shiftVarOrders = new 
ShiftVarOrderFactory().makeVarOrders( queryVarOrder );
-            varOrders.addAll(shiftVarOrders);
-        }
-        PCJ_TABLES.createPcjTable(accumuloConn, exportTableName, varOrders, 
sparql);
-
         // Get a connection to Rya. It's used to scan for Statement Pattern 
results.
         final SailConnection ryaConn = rya.getSail().getConnection();
 

Reply via email to