http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index f3f486c..747f6e5 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -23,26 +23,29 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 /**
  * Performs integration tests over the Fluo application geared towards Rya PCJ 
exporting.
- * <p>
- * These tests are being ignore so that they will not run as unit tests while 
building the application.
  */
-public class RyaExportIT extends ITBase {
+public class RyaExportIT extends RyaExportITBase {
 
     @Test
     public void resultsExported() throws Exception {
@@ -57,59 +60,69 @@ public class RyaExportIT extends ITBase {
                 "}";
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Bob";),
-                makeRyaStatement("http://Bob";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://Bob";)),
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://livesIn";), new RyaURI("http://London";)),
+                new RyaStatement(new RyaURI("http://Bob";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
 
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Charlie";),
-                makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://Charlie";)),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://livesIn";), new RyaURI("http://London";)),
+                new RyaStatement(new RyaURI("http://Charlie";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
 
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://David";),
-                makeRyaStatement("http://David";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://David";)),
+                new RyaStatement(new RyaURI("http://David";), new 
RyaURI("http://livesIn";), new RyaURI("http://London";)),
+                new RyaStatement(new RyaURI("http://David";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
 
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Eve";, "http://livesIn";, 
"http://Leeds";),
-                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
+                new RyaStatement(new RyaURI("http://Alice";), new 
RyaURI("http://talksTo";), new RyaURI("http://Eve";)),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://livesIn";), new RyaURI("http://Leeds";)),
+                new RyaStatement(new RyaURI("http://Eve";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)),
 
-                makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Alice";),
-                makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://talksTo";), new RyaURI("http://Alice";)),
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://livesIn";), new RyaURI("http://London";)),
+                new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://worksAt";), new RyaURI("http://Chipotle";)));
 
         // The expected results of the SPARQL query once the PCJ has been 
computed.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        expected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Charlie";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        expected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://David";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Bob";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Charlie";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://David";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expected.add(bs);
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
 
-        // Fetch the exported results from Accumulo once the observers finish 
working.
-        fluo.waitForObservers();
+            // Fetch the exported results from Accumulo once the observers 
finish working.
+            super.getMiniFluo().waitForObservers();
 
-        // Fetch expected results from the PCJ table that is in Accumulo.
-        final Set<BindingSet> results = Sets.newHashSet( 
pcjStorage.listResults(pcjId) );
+            // Fetch expected results from the PCJ table that is in Accumulo.
+            final Set<BindingSet> results = Sets.newHashSet( 
pcjStorage.listResults(pcjId) );
 
-        // Verify the end results of the query match the expected results.
-        assertEquals(expected, results);
+            // Verify the end results of the query match the expected results.
+            assertEquals(expected, results);
+        }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index fd70a19..3c74b13 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -23,86 +23,104 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
 import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
 
-
 /**
- * This test ensures that the correct updates are pushed by Fluo
- * to the external PCJ table as triples are added to Rya through
- * the {@link RepositoryConnection}.  The key difference between these
- * tests and those in {@link InputIT} is that streaming triples are added 
through
- * the RepositoryConnection and not through the {@link FluoClient}.  These 
tests are
- * designed to verify that the {@link AccumuloRyaDAO} has been integrated
- * with the {@link PrecomputedJoinIndexer} and that the associated {@link 
PrecomputedJoinUpdater} updates
- * Fluo accordingly.
- *
+ * This test ensures that the correct updates are pushed by Fluo to the 
external PCJ table as triples are added to Rya
+ * through the {@link RepositoryConnection}.  The key difference between these 
tests and those in {@link InputIT} is
+ * that streaming triples are added through the RepositoryConnection and not 
through the {@link FluoClient}.  These
+ * tests are designed to verify that the {@link AccumuloRyaDAO} has been 
integrated with the {@link PrecomputedJoinIndexer}
+ * and that the associated {@link PrecomputedJoinUpdater} updates Fluo 
accordingly.
  */
-
-public class RyaInputIncrementalUpdateIT extends ITBase {
+public class RyaInputIncrementalUpdateIT extends RyaExportITBase {
 
     /**
      * Ensure historic matches are included in the result.
      */
     @Test
     public void streamResultsThroughRya() throws Exception {
-
         // A query that finds people who talk to Eve and work at Chipotle.
-        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> 
<http://Eve>. "
-                + "?x <http://worksAt> <http://Chipotle>." + "}";
+        final String sparql =
+                "SELECT ?x " + "WHERE { " +
+                    "?x <http://talksTo> <http://Eve>. " +
+                    "?x <http://worksAt> <http://Chipotle>." +
+                "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Bob";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
 
-                makeStatement("http://Eve";, "http://helps";, "http://Kevin";),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://helps";), vf.createURI("http://Kevin";)),
 
-                makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // The expected results of the SPARQL query once the PCJ has been
         // computed.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Bob";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Charlie";))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie";));
+        expected.add(bs);
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
+            // Verify the end results of the query match the expected results.
+            super.getMiniFluo().waitForObservers();
 
-        // Load the historic data into Rya.
-        for (final Statement triple : historicTriples) {
-            ryaConn.add(triple);
-        }
+            // Load the historic data into Rya.
+            final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
+            for (final Statement triple : historicTriples) {
+                ryaConn.add(triple);
+            }
+
+            super.getMiniFluo().waitForObservers();
+
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultIt.hasNext()) {
+                    results.add( resultIt.next() );
+                }
+            }
 
-        fluo.waitForObservers();
-        
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -115,97 +133,146 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
     @Test
     public void historicThenStreamedResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
-        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> 
<http://Eve>. "
-                + "?x <http://worksAt> <http://Chipotle>." + "}";
+        final String sparql =
+                "SELECT ?x " + "WHERE { " +
+                    "?x <http://talksTo> <http://Eve>. " +
+                    "?x <http://worksAt> <http://Chipotle>." +
+                "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Joe";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Joe";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Triples that will be streamed into Fluo after the PCJ has been
         final Set<Statement> streamedTriples = Sets.newHashSet(
-                makeStatement("http://Frank";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Joe";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Joe";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         for (final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        fluo.waitForObservers();
+            super.getMiniFluo().waitForObservers();
 
-        // Load the streaming data into Rya.
-        for (final Statement triple : streamedTriples) {
-            ryaConn.add(triple);
-        }
+            // Load the streaming data into Rya.
+            for (final Statement triple : streamedTriples) {
+                ryaConn.add(triple);
+            }
 
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Alice";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Frank";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Joe";))));
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
 
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            final Set<BindingSet> expected = new HashSet<>();
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Alice";));
+            expected.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Frank";));
+            expected.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Joe";));
+            expected.add(bs);
+
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultIt.hasNext()) {
+                    results.add( resultIt.next() );
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
 
     @Test
     public void historicAndStreamMultiVariables() throws Exception {
-        // A query that finds people who talk to Eve and work at Chipotle.
-        // A query that finds people who talk to Eve and work at Chipotle.
-        final String sparql = "SELECT ?x ?y " + "WHERE { " + "?x 
<http://talksTo> ?y. "
-                + "?x <http://worksAt> <http://Chipotle>." + "}";
+        // A query that finds people who talk to other people and work at 
Chipotle.
+        final String sparql =
+                "SELECT ?x ?y " + "WHERE { " +
+                    "?x <http://talksTo> ?y. " +
+                    "?x <http://worksAt> <http://Chipotle>." +
+                 "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
-                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";),
-                makeStatement("http://Joe";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                vf.createStatement(vf.createURI("http://Joe";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Triples that will be streamed into Fluo after the PCJ has been
         final Set<Statement> streamedTriples = Sets.newHashSet(
-                makeStatement("http://Frank";, "http://talksTo";, 
"http://Betty";),
-                makeStatement("http://Joe";, "http://talksTo";, "http://Alice";),
-                makeStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Betty";)),
+                vf.createStatement(vf.createURI("http://Joe";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         for (final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        fluo.waitForObservers();
+            super.getMiniFluo().waitForObservers();
 
-        // Load the streaming data into Rya.
-        for (final Statement triple : streamedTriples) {
-            ryaConn.add(triple);
-        }
+            // Load the streaming data into Rya.
+            for (final Statement triple : streamedTriples) {
+                ryaConn.add(triple);
+            }
 
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Alice";)), new BindingImpl("y", new URIImpl("http://Eve";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Frank";)), new BindingImpl("y", new URIImpl("http://Betty";))));
-        expected.add(makeBindingSet(new BindingImpl("x", new 
URIImpl("http://Joe";)), new BindingImpl("y", new URIImpl("http://Alice";))));
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
+
+            final Set<BindingSet> expected = new HashSet<>();
+
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Alice";));
+            bs.addBinding("y", vf.createURI("http://Eve";));
+            expected.add(bs);
 
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Frank";));
+            bs.addBinding("y", vf.createURI("http://Betty";));
+            expected.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Joe";));
+            bs.addBinding("y", vf.createURI("http://Alice";));
+            expected.add(bs);
+
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultIt.hasNext()) {
+                    results.add( resultIt.next() );
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index 092e8a9..52d6caa 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -18,24 +18,21 @@
  */
 package org.apache.rya.indexing.pcj.fluo.integration;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
@@ -43,95 +40,58 @@ import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.impl.StatementImpl;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
 
+public class StreamingTestIT extends RyaExportITBase {
 
-public class StreamingTestIT extends ITBase {
+       private static final Logger log = 
Logger.getLogger(StreamingTestIT.class);
 
-       private static final Logger log = Logger.getLogger(ITBase.class);
-       private static String query = "select ?name ?uuid where {   ?uuid 
<http://pred1> ?name ; <http://pred2> \"literal\".}";
-       private static String uuidPrefix = "http://uuid_";;
-       private static String name = "number_";
-       private static String pred1 = "http://pred1";;
-       private static String pred2 = "http://pred2";;
-       
-       private PcjTables pcjTables = new PcjTables();
-       private String pcjTableName;
-       
-       private Sail sail;
-       private SailRepository repo;
-       private SailRepositoryConnection conn;
-       
-       
-       @Before
-       public void init() throws Exception {
-               AccumuloRdfConfiguration conf = makeConfig(instanceName, 
zookeepers);
-               conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U");
-               conf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "U");
-               
accumuloConn.securityOperations().changeUserAuthorizations("root", new 
Authorizations("U"));
-               sail =  RyaSailFactory.getInstance(conf);
-               repo = new SailRepository(sail);
-               conn = repo.getConnection();
-       }
-       
-       @After
-       public void close() throws RepositoryException, SailException {
-               conn.close();
-               repo.shutDown();
-               sail.shutDown();
-       }
-       
-       
        @Test
        public void testRandomStreamingIngest() throws Exception {
-               
-               pcjTableName = createPcj(query);
-               log.info("Adding Join Pairs...");
-               addRandomQueryStatementPairs(100);
-               Assert.assertEquals(100, countPcjs());
-               
-       }
-       
-       private String createPcj(String pcj) throws Exception {
-               
accumuloConn.securityOperations().changeUserAuthorizations("root", new 
Authorizations("U"));
-           // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(pcj);
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-               String tableName = RYA_INSTANCE_NAME + "INDEX_" + pcjId;
-               
-               return tableName;
+           final String sparql =
+                   "select ?name ?uuid where { " +
+                    "?uuid <http://pred1> ?name ; "  +
+                    "<http://pred2> \"literal\"." +
+                "}";
+
+           try (FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+               // Create the PCJ table.
+               final Connector accumuloConn = super.getAccumuloConnector();
+               final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+               final String pcjId = pcjStorage.createPcj(sparql);
+
+               // Task the Fluo app with the PCJ.
+               new CreatePcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+               // Add Statements to the Fluo app.
+               log.info("Adding Join Pairs...");
+               addRandomQueryStatementPairs(100);
+
+               super.getMiniFluo().waitForObservers();
+
+               int resultCount = 0;
+               try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                   while(resultsIt.hasNext()) {
+                       resultCount++;
+                       resultsIt.next();
+                   }
+               }
+
+               // Show the correct number of Binding Sets were created for the 
PCJ.
+               assertEquals(100, resultCount);
+           }
        }
-       
-       private void addRandomQueryStatementPairs(int numPairs) throws 
Exception {
 
-               Set<Statement> statementPairs = new HashSet<>();
+       private void addRandomQueryStatementPairs(final int numPairs) throws 
Exception {
+               final Set<Statement> statementPairs = new HashSet<>();
                for (int i = 0; i < numPairs; i++) {
-                       String uri = uuidPrefix + UUID.randomUUID().toString();
-                       Statement statement1 = new StatementImpl(new 
URIImpl(uri), new URIImpl(pred1),
-                                       new LiteralImpl(name + (i + 1)));
-                       Statement statement2 = new StatementImpl(new 
URIImpl(uri), new URIImpl(pred2), new LiteralImpl("literal"));
+                       final String uri = "http://uuid_"; + 
UUID.randomUUID().toString();
+                       final Statement statement1 = new StatementImpl(new 
URIImpl(uri), new URIImpl("http://pred1";),
+                                       new LiteralImpl("number_" + (i + 1)));
+                       final Statement statement2 = new StatementImpl(new 
URIImpl(uri), new URIImpl("http://pred2";), new LiteralImpl("literal"));
                        statementPairs.add(statement1);
                        statementPairs.add(statement2);
                }
-               conn.add(statementPairs, new Resource[0]);
-               fluo.waitForObservers();
-       }
-       
-       private int countPcjs() throws Exception {
-               Iterable<BindingSet> bindingsets = 
pcjTables.listResults(accumuloConn, pcjTableName, new Authorizations("U"));
-               int count = 0;
-               for (BindingSet bs : bindingsets) {
-//                     System.out.println(bs);
-                       count++;
-               }
-//             IncUpdateDAO.printAll(fluoClient);
-               return count;
+               
super.getRyaSailRepository().getConnection().add(statementPairs, new 
Resource[0]);
+               super.getMiniFluo().waitForObservers();
        }
-       
-       
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
index 6f4596f..30b6842 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
@@ -22,23 +22,27 @@ import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Assert;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Sets;
 
@@ -47,7 +51,7 @@ import com.google.common.collect.Sets;
  * <p>
  * These tests are being ignore so that they will not run as unit tests while 
building the application.
  */
-public class HistoricStreamingVisibilityIT extends ITBase {
+public class HistoricStreamingVisibilityIT extends RyaExportITBase {
 
     /**
      * Ensure historic matches are included in the result.
@@ -61,70 +65,74 @@ public class HistoricStreamingVisibilityIT extends ITBase {
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
-        
+
+        final Connector accumuloConn = super.getAccumuloConnector();
         
accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new 
Authorizations("U","V","W"));
-        AccumuloRyaDAO dao = new AccumuloRyaDAO();
+        final AccumuloRyaDAO dao = new AccumuloRyaDAO();
         dao.setConnector(accumuloConn);
         dao.setConf(makeConfig());
         dao.init();
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
+
         final Set<RyaStatement> historicTriples = Sets.newHashSet(
-                makeRyaStatement(makeStatement("http://Alice";, 
"http://talksTo";, "http://Eve";),"U"),
-                makeRyaStatement(makeStatement("http://Bob";, "http://talksTo";, 
"http://Eve";),"V"),
-                makeRyaStatement(makeStatement("http://Charlie";, 
"http://talksTo";, "http://Eve";),"W"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),"U"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),"V"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),"W"),
 
-                makeRyaStatement(makeStatement("http://Eve";, "http://helps";, 
"http://Kevin";), "U"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://helps";), vf.createURI("http://Kevin";)), "U"),
 
-                makeRyaStatement(makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";), "W"),
-                makeRyaStatement(makeStatement("http://Charlie";, 
"http://worksAt";, "http://Chipotle";), "V"),
-                makeRyaStatement(makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";), "U"),
-                makeRyaStatement(makeStatement("http://David";, 
"http://worksAt";, "http://Chipotle";), "V"));
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)), "W"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)), "V"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)), "U"),
+                
makeRyaStatement(vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)), "V"));
 
         dao.add(historicTriples.iterator());
         dao.flush();
-        
+
         // The expected results of the SPARQL query once the PCJ has been 
computed.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Bob";))));
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Charlie";))));
-        
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob";));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie";));
+        expected.add(bs);
+
         // Create the PCJ table.
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+        }
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        Set<BindingSet> results = 
Sets.newHashSet(pcjStorage.listResults(pcjId));
+        super.getMiniFluo().waitForObservers();
+
+        final Set<BindingSet> results = 
Sets.newHashSet(pcjStorage.listResults(pcjId));
         Assert.assertEquals(expected, results);
     }
-    
-    
+
     private AccumuloRdfConfiguration makeConfig() {
         final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
         conf.setTablePrefix(RYA_INSTANCE_NAME);
         // Accumulo connection information.
         conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
         conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
-        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, 
super.getMiniAccumuloCluster().getInstanceName());
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, 
super.getMiniAccumuloCluster().getZooKeepers());
         conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W");
 
         return conf;
     }
-    
-    
-    private static RyaStatement makeRyaStatement(Statement statement, String 
visibility) throws UnsupportedEncodingException {
-       
-       RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
+
+    private static RyaStatement makeRyaStatement(final Statement statement, 
final String visibility) throws UnsupportedEncodingException {
+       final RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
        ryaStatement.setColumnVisibility(visibility.getBytes("UTF-8"));
        return ryaStatement;
-       
     }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index ccc2c20..e7ced90 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -37,6 +37,9 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
@@ -44,11 +47,13 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
@@ -56,10 +61,9 @@ import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.Test;
 import org.openrdf.model.URI;
 import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.URIImpl;
 import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
 import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.sail.Sail;
 
@@ -70,7 +74,7 @@ import com.google.common.base.Optional;
  * Integration tests that ensure the Fluo Application properly exports PCJ
  * results with the correct Visibility values.
  */
-public class PcjVisibilityIT extends ITBase {
+public class PcjVisibilityIT extends RyaExportITBase {
 
     private static final ValueFactory VF = new ValueFactoryImpl();
 
@@ -94,6 +98,10 @@ public class PcjVisibilityIT extends ITBase {
                   "?worker <" + WORKS_AT + "> <" + BURGER_JOINT + ">. " +
                 "}";
 
+        final Connector accumuloConn = super.getAccumuloConnector();
+        final String instanceName = 
super.getMiniAccumuloCluster().getInstanceName();
+        final String zookeepers = 
super.getMiniAccumuloCluster().getZooKeepers();
+
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
                 ACCUMULO_USER,
                 ACCUMULO_PASSWORD.toCharArray(),
@@ -103,7 +111,7 @@ public class PcjVisibilityIT extends ITBase {
         final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
 
         // Grant the root user the "u" authorization.
-        
accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new 
Authorizations("u"));
+        
super.getAccumuloConnector().securityOperations().changeUserAuthorizations(ACCUMULO_USER,
 new Authorizations("u"));
 
         // Setup a connection to the Rya instance that uses the "u" 
authorizations. This ensures
         // any statements that are inserted will have the "u" authorization on 
them and that the
@@ -127,7 +135,7 @@ public class PcjVisibilityIT extends ITBase {
             ryaConn.add(VF.createStatement(BOB, WORKS_AT, BURGER_JOINT));
 
             // Wait for Fluo to finish processing.
-            fluo.waitForObservers();
+            super.getMiniFluo().waitForObservers();
 
             // Fetch the exported result and show that its column visibility 
has been simplified.
             final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_INSTANCE_NAME, pcjId);
@@ -171,41 +179,46 @@ public class PcjVisibilityIT extends ITBase {
 
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
         final Map<RyaStatement, String> streamedTriples = new HashMap<>();
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Bob";), "A&B");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Bob";, "http://livesIn";, "http://London";), "A");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Bob";, "http://worksAt";, "http://Chipotle";), "B");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Alice";), new RyaURI("http://talksTo";),new RyaURI("http://Bob";)), 
"A&B");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Bob";), new RyaURI("http://livesIn";),new 
RyaURI("http://London";)), "A");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Bob";), new RyaURI("http://worksAt";),new 
RyaURI("http://Chipotle";)), "B");
+
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Alice";), new RyaURI("http://talksTo";),new 
RyaURI("http://Charlie";)), "B&C");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Charlie";), new RyaURI("http://livesIn";),new 
RyaURI("http://London";)), "B");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Charlie";), new RyaURI("http://worksAt";),new 
RyaURI("http://Chipotle";)), "C");
 
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Charlie";), "B&C");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Charlie";, "http://livesIn";, "http://London";), "B");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Charlie";, "http://worksAt";, "http://Chipotle";), "C");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Alice";), new RyaURI("http://talksTo";),new 
RyaURI("http://David";)), "C&D");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://David";), new RyaURI("http://livesIn";),new 
RyaURI("http://London";)), "C");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://David";), new RyaURI("http://worksAt";),new 
RyaURI("http://Chipotle";)), "D");
 
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://David";), "C&D");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://David";, "http://livesIn";, "http://London";), "C");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://David";, "http://worksAt";, "http://Chipotle";), "D");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Alice";), new RyaURI("http://talksTo";),new RyaURI("http://Eve";)), 
"D&E");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Eve";), new RyaURI("http://livesIn";),new RyaURI("http://Leeds";)), 
"D");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Eve";), new RyaURI("http://worksAt";),new 
RyaURI("http://Chipotle";)), "E");
 
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Eve";), "D&E");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Eve";, "http://livesIn";, "http://Leeds";), "D");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Eve";, "http://worksAt";, "http://Chipotle";), "E");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Frank";), new RyaURI("http://talksTo";),new 
RyaURI("http://Alice";)), "");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Frank";), new RyaURI("http://livesIn";),new 
RyaURI("http://London";)), "");
+        addStatementVisibilityEntry(streamedTriples, new RyaStatement(new 
RyaURI("http://Frank";), new RyaURI("http://worksAt";),new 
RyaURI("http://Chipotle";)), "");
 
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://talksTo";, "http://Alice";), "");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://livesIn";, "http://London";), "");
-        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://worksAt";, "http://Chipotle";), "");
+        final Connector accumuloConn = super.getAccumuloConnector();
 
         // Create the PCJ Table in Accumulo.
         final PrecomputedJoinStorage rootStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = rootStorage.createPcj(sparql);
 
-        // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
-        // Stream the data into Fluo.
-        for(final RyaStatement statement : streamedTriples.keySet()) {
-            final Optional<String> visibility = 
Optional.of(streamedTriples.get(statement));
-            new InsertTriples().insert(fluoClient, statement, visibility);
+        try( final FluoClient fluoClient = FluoFactory.newClient( 
super.getFluoConfiguration() )) {
+            // Create the PCJ in Fluo.
+            new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+            // Stream the data into Fluo.
+            for(final RyaStatement statement : streamedTriples.keySet()) {
+                final Optional<String> visibility = 
Optional.of(streamedTriples.get(statement));
+                new InsertTriples().insert(fluoClient, statement, visibility);
+            }
         }
 
         // Fetch the exported results from Accumulo once the observers finish 
working.
-        fluo.waitForObservers();
+        super.getMiniFluo().waitForObservers();
 
         setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId);
 
@@ -213,80 +226,98 @@ public class PcjVisibilityIT extends ITBase {
         final Set<BindingSet> rootResults = toSet( 
rootStorage.listResults(pcjId));
 
         final Set<BindingSet> rootExpected = Sets.newHashSet();
-        rootExpected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        rootExpected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Charlie";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        rootExpected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Eve";)),
-                new BindingImpl("city", new URIImpl("http://Leeds";))));
-        rootExpected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://David";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("customer", VF.createURI("http://Alice";));
+        bs.addBinding("worker", VF.createURI("http://Bob";));
+        bs.addBinding("city", VF.createURI("http://London";));
+        rootExpected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", VF.createURI("http://Alice";));
+        bs.addBinding("worker", VF.createURI("http://Charlie";));
+        bs.addBinding("city", VF.createURI("http://London";));
+        rootExpected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", VF.createURI("http://Alice";));
+        bs.addBinding("worker", VF.createURI("http://Eve";));
+        bs.addBinding("city", VF.createURI("http://Leeds";));
+        rootExpected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", VF.createURI("http://Alice";));
+        bs.addBinding("worker", VF.createURI("http://David";));
+        bs.addBinding("city", VF.createURI("http://London";));
+        rootExpected.add(bs);
 
         assertEquals(rootExpected, rootResults);
 
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+
         // Verify AB
         final Connector abConn = cluster.getConnector("abUser", "password");
-        final PrecomputedJoinStorage abStorage = new 
AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME);
-        final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) 
);
+        try(final PrecomputedJoinStorage abStorage = new 
AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME)) {
+            final Set<BindingSet> abResults = toSet( 
abStorage.listResults(pcjId) );
 
-        final Set<BindingSet> abExpected = Sets.newHashSet();
-        abExpected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
+            final Set<BindingSet> abExpected = Sets.newHashSet();
+            bs = new MapBindingSet();
+            bs.addBinding("customer", VF.createURI("http://Alice";));
+            bs.addBinding("worker", VF.createURI("http://Bob";));
+            bs.addBinding("city", VF.createURI("http://London";));
+            abExpected.add(bs);
 
-        assertEquals(abExpected, abResults);
+            assertEquals(abExpected, abResults);
+        }
 
         // Verify ABC
         final Connector abcConn = cluster.getConnector("abcUser", "password");
-        final PrecomputedJoinStorage abcStorage = new 
AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME);
-        final Set<BindingSet> abcResults = toSet( 
abcStorage.listResults(pcjId) );
-
-        final Set<BindingSet> abcExpected = Sets.newHashSet();
-        abcExpected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        abcExpected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Charlie";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-
-        assertEquals(abcExpected, abcResults);
+        try(final PrecomputedJoinStorage abcStorage = new 
AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME)) {
+            final Set<BindingSet> abcResults = toSet( 
abcStorage.listResults(pcjId) );
+
+            final Set<BindingSet> abcExpected = Sets.newHashSet();
+            bs = new MapBindingSet();
+            bs.addBinding("customer", VF.createURI("http://Alice";));
+            bs.addBinding("worker", VF.createURI("http://Bob";));
+            bs.addBinding("city", VF.createURI("http://London";));
+            abcExpected.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("customer", VF.createURI("http://Alice";));
+            bs.addBinding("worker", VF.createURI("http://Charlie";));
+            bs.addBinding("city", VF.createURI("http://London";));
+            abcExpected.add(bs);
+
+            assertEquals(abcExpected, abcResults);
+        }
 
         // Verify ADE
         final Connector adeConn = cluster.getConnector("adeUser", "password");
-        final PrecomputedJoinStorage adeStorage = new 
AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME);
-        final Set<BindingSet> adeResults = toSet( 
adeStorage.listResults(pcjId) );
+        try(final PrecomputedJoinStorage adeStorage = new 
AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME)) {
+            final Set<BindingSet> adeResults = toSet( 
adeStorage.listResults(pcjId) );
 
-        final Set<BindingSet> adeExpected = Sets.newHashSet();
-        adeExpected.add(makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Eve";)),
-                new BindingImpl("city", new URIImpl("http://Leeds";))));
+            final Set<BindingSet> adeExpected = Sets.newHashSet();
+            bs = new MapBindingSet();
+            bs.addBinding("customer", VF.createURI("http://Alice";));
+            bs.addBinding("worker", VF.createURI("http://Eve";));
+            bs.addBinding("city", VF.createURI("http://Leeds";));
+            adeExpected.add(bs);
 
-        assertEquals(adeExpected, adeResults);
+            assertEquals(adeExpected, adeResults);
+        }
 
         // Verify no auths.
         final Connector noAuthConn = cluster.getConnector("noAuth", 
"password");
-        final PrecomputedJoinStorage noAuthStorage = new 
AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME);
-        final Set<BindingSet> noAuthResults = toSet( 
noAuthStorage.listResults(pcjId) );
-        assertTrue( noAuthResults.isEmpty() );
+        try(final PrecomputedJoinStorage noAuthStorage = new 
AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME)) {
+            final Set<BindingSet> noAuthResults = toSet( 
noAuthStorage.listResults(pcjId) );
+            assertTrue( noAuthResults.isEmpty() );
+        }
     }
 
     private void setupTestUsers(final Connector accumuloConn, final String 
ryaInstanceName, final String pcjId) throws AccumuloException, 
AccumuloSecurityException {
         final PasswordToken pass = new PasswordToken("password");
         final SecurityOperations secOps = accumuloConn.securityOperations();
 
-        // XXX We need the table name so that we can update security for the 
users.
+        // We need the table name so that we can update security for the users.
         final String pcjTableName = new 
PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId);
 
         // Give the 'roor' user authorizations to see everything.
@@ -317,11 +348,15 @@ public class PcjVisibilityIT extends ITBase {
         triplesMap.put(statement, visibility);
     }
 
-    private Set<BindingSet> toSet(final Iterable<BindingSet> bindingSets) {
+    private Set<BindingSet> toSet(final CloseableIterator<BindingSet> 
bindingSets) throws Exception {
         final Set<BindingSet> set = new HashSet<>();
-        for(final BindingSet bindingSet : bindingSets) {
-            set.add( bindingSet );
+        try {
+            while(bindingSets.hasNext()) {
+                set.add( bindingSets.next() );
+            }
+        } finally {
+            bindingSets.close();
         }
         return set;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
index 8aa257b..885a076 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
@@ -82,11 +82,8 @@ under the License.
             <scope>test</scope>
         </dependency>
         <dependency>
-            <!--  BaseIT exists here, needs test jar to be generated. -->
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.integration</artifactId>
-            <version>${project.version}</version>
-            <classifier>tests</classifier>
+             <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-test</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
@@ -125,5 +122,42 @@ under the License.
                     </configuration>
                 </plugin>
         </plugins>
+        <pluginManagement>
+            <plugins>
+                <!--This plugin's configuration is used to store Eclipse m2e 
settings only. It has no influence on the Maven build itself.-->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            org.codehaus.mojo
+                                        </groupId>
+                                        <artifactId>
+                                            properties-maven-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [1.0.0,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>
+                                                set-system-properties
+                                            </goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
new file mode 100644
index 0000000..5fe999f
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.openrdf.sail.Sail;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a 
Rya PCJ Index.
+ */
+public class RyaExportITBase extends AccumuloExportITBase {
+
+    protected static final String RYA_INSTANCE_NAME = "test_";
+
+    private RyaSailRepository ryaSailRepo = null;
+
+    public RyaExportITBase() {
+        // Indicates that MiniFluo should be started before each test.
+        super(true);
+    }
+
+    @BeforeClass
+    public static void setupLogging() {
+        BasicConfigurator.configure();
+        Logger.getRootLogger().setLevel(Level.ERROR);
+    }
+
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new 
ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+        final RyaExportParameters ryaParams = new 
RyaExportParameters(exportParams);
+        ryaParams.setExportToRya(true);
+        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+        
ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
+        
ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
+        ryaParams.setExporterUsername(ACCUMULO_USER);
+        ryaParams.setExporterPassword(ACCUMULO_PASSWORD);
+
+        final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    @Before
+    public void setupRya() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Install the Rya instance to the mini accumulo cluster.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME, 
InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableGeoIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(true)
+                .setFluoPcjAppName( 
super.getFluoConfiguration().getApplicationName() )
+                .build());
+
+        // Connect to the Rya instance that was just installed.
+        final AccumuloRdfConfiguration conf = makeConfig(instanceName, 
zookeepers);
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        ryaSailRepo = new RyaSailRepository(sail);
+    }
+
+    @After
+    public void teardownRya() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Uninstall the instance of Rya.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+
+        // Shutdown the repo.
+        ryaSailRepo.shutDown();
+    }
+
+    /**
+     * @return A {@link RyaSailRepository} that is connected to the Rya 
instance that statements are loaded into.
+     */
+    protected RyaSailRepository getRyaSailRepository() throws Exception {
+        return ryaSailRepo;
+    }
+
+    protected AccumuloRdfConfiguration makeConfig(final String instanceName, 
final String zookeepers) {
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+        // Accumulo connection information.
+        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+        
conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+        
conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+        conf.setAuths("");
+
+        // PCJ configuration information.
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, 
super.getFluoConfiguration().getApplicationName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+                
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+                
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+        conf.setDisplayQueryPlan(true);
+
+        return conf;
+    }
+}
\ No newline at end of file

Reply via email to