http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 3bb54c4..3a42a23 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -24,58 +24,34 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.junit.Test;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
 
 /**
  * Tests the methods of {@link CountStatements}.
  */
-public class CountStatementsIT extends ITBase {
+public class CountStatementsIT extends RyaExportITBase {
 
     /**
      * Overriden so that no Observers will be started. This ensures whatever
      * statements are inserted as part of the test will not be consumed.
-     *
-     * @return A Mini Fluo cluster.
      */
     @Override
-    protected MiniFluo startMiniFluo() throws AlreadyInitializedException, 
TableExistsException {
+    protected void preFluoInitHook() throws Exception {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverSpecification> observers = new ArrayList<>();
 
-        // Configure how the mini fluo will run.
-        final FluoConfiguration config = new FluoConfiguration();
-        config.setMiniStartAccumulo(false);
-        config.setAccumuloInstance(instanceName);
-        config.setAccumuloUser(ACCUMULO_USER);
-        config.setAccumuloPassword(ACCUMULO_PASSWORD);
-        config.setInstanceZookeepers(zookeepers + "/fluo");
-        config.setAccumuloZookeepers(zookeepers);
-
-        config.setApplicationName(FLUO_APP_NAME);
-        config.setAccumuloTable("fluo" + FLUO_APP_NAME);
-
-        config.addObservers(observers);
-
-        FluoFactory.newAdmin(config).initialize(
-                new 
FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
-        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
-        return miniFluo;
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
     }
 
-
     @Test
     public void countStatements() {
         // Insert some Triples into the Fluo app.
@@ -86,12 +62,14 @@ public class CountStatementsIT extends ITBase {
         triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://David";)).setPredicate(new 
RyaURI("http://talksTo";)).setObject(new RyaURI("http://Bob";)).build() );
         triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://Eve";)).setPredicate(new RyaURI("http://talksTo";)).setObject(new 
RyaURI("http://Bob";)).build() );
 
-        new InsertTriples().insert(fluoClient, triples, 
Optional.<String>absent());
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            new InsertTriples().insert(fluoClient, triples, 
Optional.<String>absent());
 
-        // Load some statements into the Fluo app.
-        final BigInteger count = new 
CountStatements().countStatements(fluoClient);
+            // Load some statements into the Fluo app.
+            final BigInteger count = new 
CountStatements().countStatements(fluoClient);
 
-        // Ensure the count matches the expected values.
-        assertEquals(BigInteger.valueOf(5), count);
+            // Ensure the count matches the expected values.
+            assertEquals(BigInteger.valueOf(5), count);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 82b61bd..0aceaa3 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -26,8 +26,11 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import 
org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -47,7 +50,7 @@ import com.google.common.collect.Sets;
 /**
  * Integration tests the methods of {@link GetPcjMetadata}.
  */
-public class GetPcjMetadataIT extends ITBase {
+public class GetPcjMetadataIT extends RyaExportITBase {
 
     @Test
     public void getMetadataByQueryId() throws RepositoryException, 
MalformedQueryException, SailException, QueryEvaluationException, PcjException, 
NotInFluoException, NotInAccumuloException, RyaDAOException {
@@ -59,56 +62,60 @@ public class GetPcjMetadataIT extends ITBase {
                 "}";
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
-        final String queryId = new 
ListQueryIds().listQueryIds(fluoClient).get(0);
-        final PcjMetadata metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
+            // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
+            final String queryId = new 
ListQueryIds().listQueryIds(fluoClient).get(0);
+            final PcjMetadata metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
 
-        // Ensure the command returns the correct metadata.
-        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
-        final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
-        assertEquals(expected, metadata);
+            // Ensure the command returns the correct metadata.
+            final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expected = new PcjMetadata(sparql, 0L, 
varOrders);
+            assertEquals(expected, metadata);
+        }
     }
 
     @Test
     public void getAllMetadata() throws MalformedQueryException, 
SailException, QueryEvaluationException, PcjException, NotInFluoException, 
NotInAccumuloException, AccumuloException, AccumuloSecurityException, 
RyaDAOException {
-
-        final CreatePcj createPcj = new CreatePcj();
-
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
 
-        // Add a couple of queries to Accumulo.
-        final String q1Sparql =
-                "SELECT ?x " +
-                  "WHERE { " +
-                  "?x <http://talksTo> <http://Eve>. " +
-                  "?x <http://worksAt> <http://Chipotle>." +
-                "}";
-        final String q1PcjId = pcjStorage.createPcj(q1Sparql);
-        createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        final String q2Sparql =
-                "SELECT ?x ?y " +
-                  "WHERE { " +
-                  "?x <http://talksTo> ?y. " +
-                  "?y <http://worksAt> <http://Chipotle>." +
-                "}";
-        final String q2PcjId = pcjStorage.createPcj(q2Sparql);
-        createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Ensure the command returns the correct metadata.
-        final Set<PcjMetadata> expected = new HashSet<>();
-        final Set<VariableOrder> q1VarOrders = new 
ShiftVarOrderFactory().makeVarOrders(q1Sparql);
-        final Set<VariableOrder> q2VarOrders = new 
ShiftVarOrderFactory().makeVarOrders(q2Sparql);
-        expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
-        expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
-
-        final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
-        assertEquals(expected, Sets.newHashSet( metadata.values() ));
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Add a couple of queries to Accumulo.
+            final String q1Sparql =
+                    "SELECT ?x " +
+                            "WHERE { " +
+                            "?x <http://talksTo> <http://Eve>. " +
+                            "?x <http://worksAt> <http://Chipotle>." +
+                            "}";
+            final String q1PcjId = pcjStorage.createPcj(q1Sparql);
+            final CreatePcj createPcj = new CreatePcj();
+            createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+            final String q2Sparql =
+                    "SELECT ?x ?y " +
+                            "WHERE { " +
+                            "?x <http://talksTo> ?y. " +
+                            "?y <http://worksAt> <http://Chipotle>." +
+                            "}";
+            final String q2PcjId = pcjStorage.createPcj(q2Sparql);
+            createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+            // Ensure the command returns the correct metadata.
+            final Set<PcjMetadata> expected = new HashSet<>();
+            final Set<VariableOrder> q1VarOrders = new 
ShiftVarOrderFactory().makeVarOrders(q1Sparql);
+            final Set<VariableOrder> q2VarOrders = new 
ShiftVarOrderFactory().makeVarOrders(q2Sparql);
+            expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
+            expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
+
+            final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+            assertEquals(expected, Sets.newHashSet( metadata.values() ));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 85c31a0..10f2319 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -26,8 +26,12 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -42,7 +46,7 @@ import com.google.common.collect.Sets;
 /**
  * Integration tests the methods of {@link GetQueryReportl}.
  */
-public class GetQueryReportIT extends ITBase {
+public class GetQueryReportIT extends RyaExportITBase {
 
     @Test
     public void getReport() throws Exception {
@@ -56,69 +60,72 @@ public class GetQueryReportIT extends ITBase {
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Taco Shop"),
-                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Burger Join"),
-                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Pastery Shop"),
-                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://Alice";, "http://livesIn";, 
"http://Lost County"),
-                makeRyaStatement("http://Alice";, "http://livesIn";, "http://Big 
City"),
-                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://Bob";, "http://livesIn";, "http://Big 
City"),
-                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://Big City"),
-                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://David";, "http://livesIn";, 
"http://Lost County"),
-                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://Eve";, "http://livesIn";, "http://Big 
City"),
-                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Burrito Place"),
-                makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://Lost County"));
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://worksAt";), new RyaURI("http://Taco Shop")),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burger Join")),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://worksAt";), new RyaURI("http://Pastery Shop")),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://livesIn";), new RyaURI("http://Lost County")),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://livesIn";), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://livesIn";), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://livesIn";), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://David";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://David";), new 
RyaURI("http://livesIn";), new RyaURI("http://Lost County")),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://livesIn";), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://worksAt";), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://livesIn";), new RyaURI("http://Lost County")));
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
 
-        // Wait for the results to finish processing.
-        fluo.waitForObservers();
+            // Wait for the results to finish processing.
+            super.getMiniFluo().waitForObservers();
 
-        // Fetch the report.
-        final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
-        final Set<String> queryIds = metadata.keySet();
-        assertEquals(1, queryIds.size());
-        final String queryId = queryIds.iterator().next();
+            // Fetch the report.
+            final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+            final Set<String> queryIds = metadata.keySet();
+            assertEquals(1, queryIds.size());
+            final String queryId = queryIds.iterator().next();
 
-        final QueryReport report = new GetQueryReport().getReport(fluoClient, 
queryId);
+            final QueryReport report = new 
GetQueryReport().getReport(fluoClient, queryId);
 
-        // Build the expected counts map.
-        final Map<String, BigInteger> expectedCounts = new HashMap<>();
+            // Build the expected counts map.
+            final Map<String, BigInteger> expectedCounts = new HashMap<>();
 
-        final FluoQuery fluoQuery = report.getFluoQuery();
+            final FluoQuery fluoQuery = report.getFluoQuery();
 
-        final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
-        expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
+            final String queryNodeId = 
fluoQuery.getQueryMetadata().getNodeId();
+            expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
 
-        final String filterNodeId = 
fluoQuery.getFilterMetadata().iterator().next().getNodeId();
-        expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
+            final String filterNodeId = 
fluoQuery.getFilterMetadata().iterator().next().getNodeId();
+            expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
 
-        final String joinNodeId = 
fluoQuery.getJoinMetadata().iterator().next().getNodeId();
-        expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
+            final String joinNodeId = 
fluoQuery.getJoinMetadata().iterator().next().getNodeId();
+            expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
 
-        final Iterator<StatementPatternMetadata> patterns = 
fluoQuery.getStatementPatternMetadata().iterator();
-        final StatementPatternMetadata sp1 = patterns.next();
-        final StatementPatternMetadata sp2 = patterns.next();
-        if(sp1.getStatementPattern().contains("http://worksAt";)) {
-            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
-            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
-        } else {
-            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
-            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
-        }
+            final Iterator<StatementPatternMetadata> patterns = 
fluoQuery.getStatementPatternMetadata().iterator();
+            final StatementPatternMetadata sp1 = patterns.next();
+            final StatementPatternMetadata sp2 = patterns.next();
+            if(sp1.getStatementPattern().contains("http://worksAt";)) {
+                expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
+                expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
+            } else {
+                expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
+                expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
+            }
 
-        assertEquals(expectedCounts, report.getCounts());
+            assertEquals(expectedCounts, report.getCounts());
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index 19bc272..ec301ba 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -26,17 +26,18 @@ import java.util.List;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.junit.Test;
 
 import com.beust.jcommander.internal.Lists;
 
-import org.apache.fluo.api.client.Transaction;
-
 /**
  * Integration tests the methods of {@link ListQueryIds}.
  */
-public class ListQueryIdsIT extends ITBase {
+public class ListQueryIdsIT extends RyaExportITBase {
 
     /**
      * This test ensures that when there are PCJ tables in Accumulo as well as
@@ -45,18 +46,20 @@ public class ListQueryIdsIT extends ITBase {
      */
     @Test
     public void getQueryIds() throws AccumuloException, 
AccumuloSecurityException, TableExistsException {
-        // Store a few SPARQL/Query ID pairs in the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            tx.set("SPARQL_3", QUERY_ID, "ID_3");
-            tx.set("SPARQL_1", QUERY_ID, "ID_1");
-            tx.set("SPARQL_4", QUERY_ID, "ID_4");
-            tx.set("SPARQL_2", QUERY_ID, "ID_2");
-            tx.commit();
-        }
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Store a few SPARQL/Query ID pairs in the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                tx.set("SPARQL_3", QUERY_ID, "ID_3");
+                tx.set("SPARQL_1", QUERY_ID, "ID_1");
+                tx.set("SPARQL_4", QUERY_ID, "ID_4");
+                tx.set("SPARQL_2", QUERY_ID, "ID_2");
+                tx.commit();
+            }
 
-        // Ensure the correct list of Query IDs is retured.
-        final List<String> expected = Lists.newArrayList("ID_1", "ID_2", 
"ID_3", "ID_4");
-        final List<String> queryIds = new 
ListQueryIds().listQueryIds(fluoClient);
-        assertEquals(expected, queryIds);
+            // Ensure the correct list of Query IDs is retured.
+            final List<String> expected = Lists.newArrayList("ID_1", "ID_2", 
"ID_3", "ID_4");
+            final List<String> queryIds = new 
ListQueryIds().listQueryIds(fluoClient);
+            assertEquals(expected, queryIds);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index d5ed447..082f46d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,7 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -30,13 +36,10 @@ import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.repository.RepositoryException;
 
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-
 /**
  * Integration tests the methods of {@link FluoQueryMetadataDAO}.
  */
-public class FluoQueryMetadataDAOIT extends ITBase {
+public class FluoQueryMetadataDAOIT extends RyaExportITBase {
 
     @Test
     public void statementPatternMetadataTest() throws RepositoryException {
@@ -49,20 +52,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setParentNodeId("parentNodeId");
         final StatementPatternMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        StatementPatternMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            StatementPatternMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readStatementPatternMetadata(sx, 
"nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -78,20 +83,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setFilterIndexWithinSparql(2);
         final FilterMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        FilterMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readFilterMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            FilterMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readFilterMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -107,20 +114,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setRightChildNodeId("rightChildNodeId");
         final JoinMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        JoinMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readJoinMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            JoinMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readJoinMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -134,20 +143,85 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setChildNodeId("childNodeId");
         final QueryMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            QueryMetadata storedMetdata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetdata);
         }
+    }
+
+    @Test
+    public void aggregationMetadataTest_withGroupByVarOrders() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final AggregationMetadata originalMetadata = 
AggregationMetadata.builder("nodeId")
+                .setVariableOrder(new VariableOrder("totalCount"))
+                .setParentNodeId("parentNodeId")
+                .setChildNodeId("childNodeId")
+                .setGroupByVariableOrder(new VariableOrder("a", "b", "c"))
+                .addAggregation(new AggregationElement(AggregationType.COUNT, 
"count", "totalCount"))
+                .addAggregation(new 
AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+                .build();
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        QueryMetadata storedMetdata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+            // Read it from the Fluo table.
+            AggregationMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetadata);
         }
+    }
+
+    @Test
+    public void aggregationMetadataTest_noGroupByVarOrders() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final AggregationMetadata originalMetadata = 
AggregationMetadata.builder("nodeId")
+                .setVariableOrder(new VariableOrder("totalCount"))
+                .setParentNodeId("parentNodeId")
+                .setChildNodeId("childNodeId")
+                .addAggregation(new AggregationElement(AggregationType.COUNT, 
"count", "totalCount"))
+                .addAggregation(new 
AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+                .build();
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            AggregationMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetdata);
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -168,19 +242,21 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
         final FluoQuery originalQuery = new 
SparqlFluoQueryBuilder().make(query, new NodeIds());
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalQuery);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        FluoQuery storedQuery = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedQuery = dao.readFluoQuery(sx, 
originalQuery.getQueryMetadata().getNodeId());
-        }
+            // Read it from the Fluo table.
+            FluoQuery storedQuery = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedQuery = dao.readFluoQuery(sx, 
originalQuery.getQueryMetadata().getNodeId());
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalQuery, storedQuery);
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalQuery, storedQuery);
+        }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index b4c8d69..21d7db0 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -18,100 +18,152 @@
  */
 package org.apache.rya.indexing.pcj.fluo.integration;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
 
-public class CreateDeleteIT extends ITBase {
+/**
+ * Tests that ensure the PCJ delete support works.
+ */
+public class CreateDeleteIT extends RyaExportITBase {
 
-    /**
-     * Ensure historic matches are included in the result.
-     */
     @Test
-    public void historicResults() throws Exception {
+    public void deletePCJ() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
-        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> 
<http://Eve>. "
-                + "?x <http://worksAt> <http://Chipotle>." + "}";
+        final String sparql =
+                "SELECT ?x " + "WHERE { " +
+                    "?x <http://talksTo> <http://Eve>. " +
+                    "?x <http://worksAt> <http://Chipotle>." +
+                "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
-        final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Bob";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
-
-                makeStatement("http://Eve";, "http://helps";, "http://Kevin";),
-
-                makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
-
-        // The expected results of the SPARQL query once the PCJ has been
-        // computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Bob";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Charlie";))));
-
-        // Load the historic data into Rya.
-        for (final Statement triple : historicTriples) {
-            ryaConn.add(triple);
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Set<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://helps";), vf.createURI("http://Kevin";)),
+
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Ensure the data was loaded.
+            final List<Bytes> rows = getFluoTableEntries(fluoClient);
+            assertEquals(17, rows.size());
+
+            // Delete the PCJ from the Fluo application.
+            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+            // Ensure all data related to the query has been removed.
+            final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+            assertEquals(0, empty_rows.size());
         }
+    }
+
+    @Test
+    public void deleteAggregation() throws Exception {
+        // A query that finds the maximum price for an item within the 
inventory.
+        final String sparql =
+                "SELECT (max(?price) as ?maxPrice) { " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(2.50)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(0.99)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Ensure the data was loaded.
+            final List<Bytes> rows = getFluoTableEntries(fluoClient);
+            assertEquals(10, rows.size());
+
+            // Delete the PCJ from the Fluo application.
+            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+            // Ensure all data related to the query has been removed.
+            final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+            assertEquals(0, empty_rows.size());
+        }
+    }
 
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
+    private String loadData(final String sparql, final Collection<Statement> 
statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        // Register the PCJ with Rya.
+        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
+                ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(),
+                accInstance.getInstanceName(),
+                accInstance.getZooKeepers()), accumuloConn);
 
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected, results);
+        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
 
-        List<Bytes> rows = getFluoTableEntries(fluoClient);
-        assertEquals(17, rows.size());
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
 
-        // Delete the PCJ from the Fluo application.
-        new DeletePcj(1).deletePcj(fluoClient, pcjId);
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
 
-        // Ensure all data related to the query has been removed.
-        List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
-        assertEquals(0, empty_rows.size());
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
     }
 
-    private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
+    private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) {
         try (Snapshot snapshot = fluoClient.newSnapshot()) {
-            List<Bytes> rows = new ArrayList<>();
-            RowScanner rscanner = 
snapshot.scanner().over(Span.prefix("")).byRow().build();
+            final List<Bytes> rows = new ArrayList<>();
+            final RowScanner rscanner = 
snapshot.scanner().over(Span.prefix("")).byRow().build();
 
-            for(ColumnScanner cscanner: rscanner) {
+            for(final ColumnScanner cscanner: rscanner) {
                rows.add(cscanner.getRow());
             }
-            
+
             return rows;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index dcab997..ab97bbd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -19,32 +19,37 @@
 package org.apache.rya.indexing.pcj.fluo.integration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 /**
  * Performs integration tests over the Fluo application geared towards various 
types of input.
- * <p>
- * These tests are being ignore so that they will not run as unit tests while 
building the application.
  */
-public class InputIT extends ITBase {
+public class InputIT extends RyaExportITBase {
 
     /**
      * Ensure historic matches are included in the result.
@@ -53,49 +58,64 @@ public class InputIT extends ITBase {
     public void historicResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Bob";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
 
-                makeStatement("http://Eve";, "http://helps";, "http://Kevin";),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://helps";), vf.createURI("http://Kevin";)),
 
-                makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // The expected results of the SPARQL query once the PCJ has been 
computed.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Bob";))));
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Charlie";))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie";));
+        expected.add(bs);
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+            // Verify the end results of the query match the expected results.
+            super.getMiniFluo().waitForObservers();
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
 
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -105,51 +125,67 @@ public class InputIT extends ITBase {
     public void streamedResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Bob";, "http://talksTo";, "http://Eve";),
-                makeRyaStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
 
-                makeRyaStatement("http://Eve";, "http://helps";, "http://Kevin";),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://helps";), new RyaURI("http://Kevin";)),
 
-                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
+                new RyaStatement(new RyaURI("http://David";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)));
 
         // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Bob";))));
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Charlie";))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie";));
+        expected.add(bs);
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Ensure the query has no results yet.
-        fluo.waitForObservers();
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertTrue( results.isEmpty() );
+            // Ensure the query has no results yet.
+            super.getMiniFluo().waitForObservers();
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                assertFalse( resultsIt.hasNext() );
+            }
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+
+            // Verify the end results of the query match the expected results.
+            super.getMiniFluo().waitForObservers();
+
+            final HashSet<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -162,53 +198,75 @@ public class InputIT extends ITBase {
     public void historicThenStreamedResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)));
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Alice";))));
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
 
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            final Set<BindingSet> expected = new HashSet<>();
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Alice";));
+            expected.add(bs);
 
-        // Verify the end results of the query also include Frank.
-        fluo.waitForObservers();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Frank";))));
+            Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add(resultsIt.next());
+                }
+            }
 
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+
+            // Verify the end results of the query also include Frank.
+            super.getMiniFluo().waitForObservers();
+
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Frank";));
+            expected.add(bs);
+
+            results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add(resultsIt.next());
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -222,50 +280,69 @@ public class InputIT extends ITBase {
     public void historicAndStreamConflict() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)));
 
         // The expected final result.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Alice";))));
+
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Alice";));
+        expected.add(bs);
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
-
-        // Stream the same Alice triple into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
-
-        // Verify the end results of the query is stiill only Alice.
-        fluo.waitForObservers();
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
+
+            Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+            assertEquals(expected, results);
+
+            // Stream the same Alice triple into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+
+            // Verify the end results of the query is stiill only Alice.
+            super.getMiniFluo().waitForObservers();
+
+            results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+            assertEquals(expected, results);
+        }
     }
 }
\ No newline at end of file

Reply via email to