http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 5e12fac..7fa28ab 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -18,58 +18,39 @@
  */
 package org.apache.rya.indexing.pcj.fluo.integration;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Properties;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.Binding;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
 /**
  * Performs integration tests over the Fluo application geared towards Kafka 
PCJ exporting.
  * <p>
@@ -78,215 +59,463 @@ import kafka.zk.EmbeddedZookeeper;
  * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration
  * $ mvn surefire:test -Dtest=KafkaExportIT
  */
-public class KafkaExportIT extends ITBase {
-    private static final Log logger = LogFactory.getLog(KafkaExportIT.class);
-
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private static final String TOPIC = "testTopic";
-    private ZkUtils zkUtils;
-    private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private ZkClient zkClient;
-
-
-        /**
-     * setup mini kafka and call the super to setup mini fluo
-     * 
-     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
-     */
-    @Override
-    public void setupMiniResources() throws Exception {
-        super.setupMiniResources();
-
-        zkServer = new EmbeddedZookeeper();
-        String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-        zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        KafkaConfig config = new KafkaConfig(brokerProps);
-        Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-
-        logger.trace("setup kafka and fluo.");
-    }
-
-    /**
-     * Test kafka without rya code to make sure kafka works in this 
environment.
-     * If this test fails then its a testing environment issue, not with Rya.
-     * Source: https://github.com/asmaier/mini-kafka
-     * 
-     * @throws InterruptedException
-     * @throws IOException
-     */
-        @Test
-        public void embeddedKafkaTest() throws InterruptedException, 
IOException {
-            // create topic
-            AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-
-            // setup producer
-            Properties producerProps = new Properties();
-            producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
-            
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
-            producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-            KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<Integer, byte[]>(producerProps);
-
-            // setup consumer
-            Properties consumerProps = new Properties();
-            consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
-            consumerProps.setProperty("group.id", "group0");
-            consumerProps.setProperty("client.id", "consumer0");
-            
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
-            consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-            consumerProps.put("auto.offset.reset", "earliest");  // to make 
sure the consumer starts from the beginning of the topic
-            KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
-            consumer.subscribe(Arrays.asList(TOPIC));
-
-            // send message
-            ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 
42, "test-message".getBytes(StandardCharsets.UTF_8));
-            producer.send(data);
-            producer.close();
-
-            // starting consumer
-        ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
-            assertEquals(1, records.count());
-            Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
-            ConsumerRecord<Integer, byte[]> record = recordIterator.next();
-        logger.trace(String.format("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value()));
-            assertEquals(42, (int) record.key());
-            assertEquals("test-message", new String(record.value(), 
StandardCharsets.UTF_8));
-        consumer.close();
-    }
+public class KafkaExportIT extends KafkaExportITBase {
 
     @Test
     public void newResultsExportedTest() throws Exception {
-        final String sparql = "SELECT ?customer ?worker ?city " + "{ " + 
"FILTER(?customer = <http://Alice>) " + "FILTER(?city = <http://London>) " + 
"?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + 
"?worker <http://worksAt> <http://Chipotle>. " + "}";
-    
+        final String sparql =
+                "SELECT ?customer ?worker ?city { " +
+                    "FILTER(?customer = <http://Alice>) " +
+                    "FILTER(?city = <http://London>) " +
+                    "?customer <http://talksTo> ?worker. " +
+                    "?worker <http://livesIn> ?city. " +
+                    "?worker <http://worksAt> <http://Chipotle>. " +
+                "}";
+
         // Triples that will be streamed into Fluo after the PCJ has been 
created.
-        final Set<RyaStatement> streamedTriples = 
Sets.newHashSet(makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Bob";), makeRyaStatement("http://Bob";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Charlie";), makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://David";), makeRyaStatement("http://David";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";),
-                        makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";), makeRyaStatement("http://Eve";, "http://livesIn";, 
"http://Leeds";), makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-                        makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Alice";), makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://London";), makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
-    
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements =
+                Sets.newHashSet(
+                        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Bob";)),
+                        vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                        vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Charlie";)),
+                        vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                        vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://David";)),
+                        vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                        vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                        vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://livesIn";), vf.createURI("http://Leeds";)),
+                        vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+                        vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+                        vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                        vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
         // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new URIImpl("http://Bob";)), 
new BindingImpl("city", new URIImpl("http://London";))));
-        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new 
URIImpl("http://Charlie";)), new BindingImpl("city", new 
URIImpl("http://London";))));
-        expected.add(makeBindingSet(new BindingImpl("customer", new 
URIImpl("http://Alice";)), new BindingImpl("worker", new 
URIImpl("http://David";)), new BindingImpl("city", new 
URIImpl("http://London";))));
-    
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-    
-        // Tell the Fluo app to maintain the PCJ.
-        CreatePcj createPcj = new CreatePcj();
-        String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, 
pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String> absent());
-    
-        // Fetch the exported results from Accumulo once the observers finish 
working.
-        fluo.waitForObservers();
-
-        /// KafkaConsumer<Integer, byte[]> consumer = 
makeConsumer(QueryIdIsTopicName);
-        KafkaConsumer<Integer, VisibilityBindingSet> consumer = 
makeConsumer(QueryIdIsTopicName);
-
-        // starting consumer polling for messages
-        /// ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
-        ConsumerRecords<Integer, VisibilityBindingSet> records = 
consumer.poll(3000);
-        /// Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
-        Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator 
= records.iterator();
-        boolean allExpected = true;
-        ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null;
-        while (recordIterator.hasNext()) {
-            ConsumerRecord<Integer, VisibilityBindingSet> record = 
recordIterator.next();
-            logger.trace(String.format("Consumed offset = %d, key = %s, value 
= %s", record.offset(), record.key(), record.value().toString()));
-            boolean expectedThis = expected.contains(record.value());
-            if (!expectedThis) {
-                logger.trace("This consumed record is not expected.");
-                unexpectedRecord = record;
-            }
-            allExpected = allExpected && expectedThis;
-        }
-        assertTrue("Must consume expected record: not expected:" + 
unexpectedRecord, allExpected);
-        assertNotEquals("Should get some results", 0, records.count());
-        // assertEquals(42, (int) record.key());
-        // assertEquals("test-message", new String(record.value(), 
StandardCharsets.UTF_8));
+        final Set<BindingSet> expectedResult = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Bob";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResult.add( new VisibilityBindingSet(bs) );
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Charlie";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResult.add( new VisibilityBindingSet(bs) );
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://David";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResult.add( new VisibilityBindingSet(bs) );
+
+        // Ensure the last result matches the expected result.
+        final Set<VisibilityBindingSet> result = readAllResults(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void min() throws Exception {
+        // A query that finds the minimum price for an item within the 
inventory.
+        final String sparql =
+                "SELECT (min(?price) as ?minPrice) { " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(2.50)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(0.99)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("minPrice", vf.createLiteral(0.99));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void max() throws Exception {
+        // A query that finds the maximum price for an item within the 
inventory.
+        final String sparql =
+                "SELECT (max(?price) as ?maxPrice) { " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(2.50)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(0.99)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("maxPrice", vf.createLiteral(4.99));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void count() throws Exception {
+        // A query that counts the number of unique items that are in the 
inventory.
+        final String sparql =
+                "SELECT (count(?item) as ?itemCount) { " +
+                    "?item <urn:id> ?id . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                // Three that are part of the count.
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())),
+
+                // One that is not.
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(3.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("itemCount", vf.createLiteral("3", 
XMLSchema.INTEGER));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void sum() throws Exception {
+        // A query that sums the counts of all of the items that are in the 
inventory.
+        final String sparql =
+                "SELECT (sum(?count) as ?itemSum) { " +
+                    "?item <urn:count> ?count . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:count"), vf.createLiteral(5)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:count"), vf.createLiteral(7)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:count"), vf.createLiteral(2)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("itemSum", vf.createLiteral("14", 
XMLSchema.INTEGER));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void average() throws Exception  {
+        // A query that finds the average price for an item that is in the 
inventory.
+        final String sparql =
+                "SELECT (avg(?price) as ?averagePrice) { " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(3)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(4)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(8)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("averagePrice", vf.createLiteral("5", 
XMLSchema.DECIMAL));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void aggregateWithFilter() throws Exception {
+        // A query that filters results from a statement pattern before 
applying the aggregation function.
+        final String sparql =
+                "SELECT (min(?price) as ?minPrice) { " +
+                    "FILTER(?price > 1.00) " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(2.50)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(0.99)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("minPrice", vf.createLiteral(2.50));
 
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
     }
 
-    /**
-     * A helper function for creating a {@link BindingSet} from an array of
-     * {@link Binding}s.
-     *
-     * @param bindings
-     *            - The bindings to include in the set. (not null)
-     * @return A {@link BindingSet} holding the bindings.
-     */
-    protected static BindingSet makeBindingSet(final Binding... bindings) {
-        return new VisibilityBindingSet(ITBase.makeBindingSet(bindings));
+    @Test
+    public void multipleAggregations() throws Exception {
+        // A query that both counts the number of items being averaged and 
finds the average price.
+        final String sparql =
+                "SELECT (count(?item) as ?itemCount) (avg(?price) as 
?averagePrice) {" +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(5.25)),
+                vf.createStatement(vf.createURI("urn:gum"), 
vf.createURI("urn:price"), vf.createLiteral(7)),
+                vf.createStatement(vf.createURI("urn:sandwich"), 
vf.createURI("urn:price"), vf.createLiteral(2.75)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final MapBindingSet expectedResult = new MapBindingSet();
+        expectedResult.addBinding("itemCount", vf.createLiteral("3", 
XMLSchema.INTEGER));
+        expectedResult.addBinding("averagePrice", vf.createLiteral("5.0", 
XMLSchema.DECIMAL));
+
+        // Ensure the last result matches the expected result.
+        final VisibilityBindingSet result = readLastResult(pcjId);
+        assertEquals(expectedResult, result);
     }
 
-    /**
-     * @param TopicName
-     * @return
-     */
-    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(String 
TopicName) {
-        // setup consumer
-        Properties consumerProps = new Properties();
-        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + BROKERPORT);
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
-        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer0");
-        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-        // "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest"); // to make sure the consumer starts from the beginning of the topic
-        /// KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
-        KafkaConsumer<Integer, VisibilityBindingSet> consumer = new 
KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList(TopicName));
-        return consumer;
+    @Test
+    public void groupBySingleBinding() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql =
+                "SELECT ?item (avg(?price) as ?averagePrice) {" +
+                    "?item <urn:price> ?price . " +
+                "} " +
+                "GROUP BY ?item";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(5.25)),
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(7)),
+                vf.createStatement(vf.createURI("urn:apple"), 
vf.createURI("urn:price"), vf.createLiteral(2.75)),
+                vf.createStatement(vf.createURI("urn:banana"), 
vf.createURI("urn:price"), vf.createLiteral(2.75)),
+                vf.createStatement(vf.createURI("urn:banana"), 
vf.createURI("urn:price"), vf.createLiteral(1.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("item", vf.createURI("urn:apple"));
+        bs.addBinding("averagePrice", vf.createLiteral("5.0", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        bs = new MapBindingSet();
+        bs.addBinding("item", vf.createURI("urn:banana"));
+        bs.addBinding("averagePrice", vf.createLiteral("2.37", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        // Verify the end results of the query match the expected results.
+        final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, 
new VariableOrder("item"));
+        assertEquals(expectedResults, results);
     }
 
-    /**
-     * Add info about the kafka queue/topic to receive the export.
-     * Call super to get the Rya parameters.
-     * 
-     * @see 
org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
-     */
-    @Override
-    protected void setExportParameters(HashMap<String, String> exportParams) {
-        // Get the defaults
-        super.setExportParameters(exportParams);
-        // Add the kafka parameters
-        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(exportParams);
-        kafkaParams.setExportToKafka(true);
-        // Configure the Producer
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST 
+ ":" + BROKERPORT);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-        // "org.apache.kafka.common.serialization.StringSerializer");
-        kafkaParams.addAllProducerConfig(producerConfig);
+    @Test
+    public void groupByManyBindings_avaerages() throws Exception {
+        // A query that groups what is aggregated by two of the keys.
+        final String sparql =
+                "SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+                    "?id <urn:type> ?type . " +
+                    "?id <urn:location> ?location ." +
+                    "?id <urn:price> ?price ." +
+                "} " +
+                "GROUP BY ?type ?location";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                // American items that will be averaged.
+                vf.createStatement(vf.createURI("urn:1"), 
vf.createURI("urn:type"), vf.createLiteral("apple")),
+                vf.createStatement(vf.createURI("urn:1"), 
vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:1"), 
vf.createURI("urn:price"), vf.createLiteral(2.50)),
+
+                vf.createStatement(vf.createURI("urn:2"), 
vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:2"), 
vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:2"), 
vf.createURI("urn:price"), vf.createLiteral(.99)),
+
+                vf.createStatement(vf.createURI("urn:3"), 
vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:3"), 
vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:3"), 
vf.createURI("urn:price"), vf.createLiteral(5.25)),
+
+                // French items that will be averaged.
+                vf.createStatement(vf.createURI("urn:4"), 
vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:4"), 
vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:4"), 
vf.createURI("urn:price"), vf.createLiteral(8.5)),
+
+                vf.createStatement(vf.createURI("urn:5"), 
vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+                vf.createStatement(vf.createURI("urn:5"), 
vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:5"), 
vf.createURI("urn:price"), vf.createLiteral(3.99)),
+
+                vf.createStatement(vf.createURI("urn:6"), 
vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+                vf.createStatement(vf.createURI("urn:6"), 
vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:6"), 
vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("apple", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("2.5", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("3.12", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("France", 
XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("8.5", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs));
+
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cigarettes", 
XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("France", 
XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("4.49", 
XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        // Verify the end results of the query match the expected results.
+        final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, 
new VariableOrder("type", "location"));
+        assertEquals(expectedResults, results);
     }
 
-    /**
-     * Close all the Kafka mini server and mini-zookeeper
-     * 
-     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
-     */
-    @Override
-    public void shutdownMiniResources() {
-        super.shutdownMiniResources();
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
+    private String loadData(final String sparql, final Collection<Statement> 
statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+
+        // Register the PCJ with Rya.
+        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
+                ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(),
+                accInstance.getInstanceName(),
+                accInstance.getZooKeepers()), accumuloConn);
+
+        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
+
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
+
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
+    }
+
+    private Set<VisibilityBindingSet> readAllResults(final String pcjId) 
throws Exception {
+        requireNonNull(pcjId);
+
+        // Read all of the results from the Kafka topic.
+        final Set<VisibilityBindingSet> results = new HashSet<>();
+
+        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = 
makeConsumer(pcjId)) {
+            final ConsumerRecords<Integer, VisibilityBindingSet> records = 
consumer.poll(5000);
+            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> 
recordIterator = records.iterator();
+            while (recordIterator.hasNext()) {
+                results.add( recordIterator.next().value() );
+            }
+        }
+
+        return results;
+    }
+
+    private VisibilityBindingSet readLastResult(final String pcjId) throws 
Exception {
+        requireNonNull(pcjId);
+
+        // Read the results from the Kafka topic. The last one has the final 
aggregation result.
+        VisibilityBindingSet result = null;
+
+        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = 
makeConsumer(pcjId)) {
+            final ConsumerRecords<Integer, VisibilityBindingSet> records = 
consumer.poll(5000);
+            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> 
recordIterator = records.iterator();
+            while (recordIterator.hasNext()) {
+                result = recordIterator.next().value();
+            }
+        }
+
+        return result;
+    }
+
+    private Set<VisibilityBindingSet> readGroupedResults(final String pcjId, 
final VariableOrder groupByVars) {
+        requireNonNull(pcjId);
+
+        // Read the results from the Kafka topic. The last one for each set of 
Group By values is an aggregation result.
+        // The key in this map is a Binding Set containing only the group by 
variables.
+        final Map<BindingSet, VisibilityBindingSet> results = new HashMap<>();
+
+        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = 
makeConsumer(pcjId)) {
+            final ConsumerRecords<Integer, VisibilityBindingSet> records = 
consumer.poll(5000);
+            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> 
recordIterator = records.iterator();
+            while (recordIterator.hasNext()) {
+                final VisibilityBindingSet visBindingSet = 
recordIterator.next().value();
+
+                final MapBindingSet key = new MapBindingSet();
+                for(final String groupByBar : groupByVars) {
+                    key.addBinding( visBindingSet.getBinding(groupByBar) );
+                }
+
+                results.put(key, visBindingSet);
+            }
+        }
+
+        return Sets.newHashSet( results.values() );
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 648c5b9..08bf2e1 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -18,44 +18,47 @@
  */
 package org.apache.rya.indexing.pcj.fluo.integration;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigDecimal;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.datatypes.XMLDatatypeUtil;
 import org.openrdf.model.impl.BooleanLiteralImpl;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.NumericLiteralImpl;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
 import org.openrdf.query.algebra.evaluation.function.Function;
 import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
+
 /**
  * Performs integration tests over the Fluo application geared towards various 
query structures.
- * <p>
- * These tests are being ignore so that they will not run as unit tests while 
building the application.
  */
-public class QueryIT extends ITBase {
+public class QueryIT extends RyaExportITBase {
 
     @Test
     public void optionalStatements() throws Exception {
@@ -69,40 +72,35 @@ public class QueryIT extends ITBase {
                     "OPTIONAL {?person <http://passedExam> ?exam } . " +
                 "}";
 
-        // Triples that will be streamed into Fluo after the PCJ has been 
created.
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://hasDegreeIn";, 
"http://Computer Science"),
-                makeRyaStatement("http://Alice";, "http://passedExam";, 
"http://Certified Ethical Hacker"),
-                makeRyaStatement("http://Bob";, "http://hasDegreeIn";, 
"http://Law";),
-                makeRyaStatement("http://Bob";, "http://passedExam";, 
"http://MBE";),
-                makeRyaStatement("http://Bob";, "http://passedExam";, 
"http://BAR-Kansas";),
-                makeRyaStatement("http://Charlie";, "http://hasDegreeIn";, 
"http://Law";));
-
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add( makeBindingSet(
-                new BindingImpl("person", new URIImpl("http://Bob";)),
-                new BindingImpl("exam", new URIImpl("http://MBE";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("person", new URIImpl("http://Bob";)),
-                new BindingImpl("exam", new URIImpl("http://BAR-Kansas";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("person", new URIImpl("http://Charlie";))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://hasDegreeIn";), vf.createURI("http://Computer Science")),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://passedExam";), vf.createURI("http://Certified Ethical 
Hacker")),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://hasDegreeIn";), vf.createURI("http://Law";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://passedExam";), vf.createURI("http://MBE";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://passedExam";), vf.createURI("http://BAR-Kansas";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://hasDegreeIn";), vf.createURI("http://Law";)));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("http://Bob";));
+        bs.addBinding("exam", vf.createURI("http://MBE";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("http://Bob";));
+        bs.addBinding("exam", vf.createURI("http://BAR-Kansas";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("http://Charlie";));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected,  results);
+        runTest(sparql, statements, expectedResults);
     }
 
     /**
@@ -124,71 +122,66 @@ public class QueryIT extends ITBase {
                   "?candidate <http://talksTo> ?leader. " +
                 "}";
 
-        // Triples that will be streamed into Fluo after the PCJ has been 
created.
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
                 // Leaders
-                makeRyaStatement("http://Alice";, "http://leaderOf";, 
"http://GeekSquad";),
-                makeRyaStatement("http://Bob";, "http://leaderOf";, 
"http://GeekSquad";),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://leaderOf";), vf.createURI("http://GeekSquad";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://leaderOf";), vf.createURI("http://GeekSquad";)),
 
                 // Recruiters
-                makeRyaStatement("http://Charlie";, "http://recruiterFor";, 
"http://GeekSquad";),
-                makeRyaStatement("http://David";, "http://recruiterFor";, 
"http://GeekSquad";),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://recruiterFor";), vf.createURI("http://GeekSquad";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://recruiterFor";), vf.createURI("http://GeekSquad";)),
 
                 // Candidates
-                makeRyaStatement("http://Eve";, "http://skilledWith";, 
"http://Computers";),
-                makeRyaStatement("http://Eve";, "http://livesIn";, "USA"),
-                makeRyaStatement("http://Frank";, "http://skilledWith";, 
"http://Computers";),
-                makeRyaStatement("http://Frank";, "http://livesIn";, "USA"),
-                makeRyaStatement("http://George";, "http://skilledWith";, 
"http://Computers";),
-                makeRyaStatement("http://George";, "http://livesIn";, "Germany"),
-                makeRyaStatement("http://Harry";, "http://skilledWith";, 
"http://Negotiating";),
-                makeRyaStatement("http://Harry";, "http://livesIn";, "USA"),
-                makeRyaStatement("http://Ivan";, "http://skilledWith";, 
"http://Computers";),
-                makeRyaStatement("http://Ivan";, "http://livesIn";, "USA"),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("http://George";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
+                vf.createStatement(vf.createURI("http://George";), 
vf.createURI("http://livesIn";), vf.createLiteral("Germany")),
+                vf.createStatement(vf.createURI("http://Harry";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Negotiating";)),
+                vf.createStatement(vf.createURI("http://Harry";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("http://Ivan";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
+                vf.createStatement(vf.createURI("http://Ivan";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
 
                 // Candidates the recruiters talk to.
-                makeRyaStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Charlie";, "http://talksTo";, 
"http://George";),
-                makeRyaStatement("http://Charlie";, "http://talksTo";, 
"http://Harry";),
-                makeRyaStatement("http://David";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://David";, "http://talksTo";, 
"http://Frank";),
-                makeRyaStatement("http://David";, "http://talksTo";, 
"http://Ivan";),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://George";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Harry";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://talksTo";), vf.createURI("http://Frank";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://talksTo";), vf.createURI("http://Ivan";)),
 
                 // Recruits that talk to leaders.
-                makeRyaStatement("http://Eve";, "http://talksTo";, 
"http://Alice";),
-                makeRyaStatement("http://George";, "http://talksTo";, 
"http://Alice";),
-                makeRyaStatement("http://Harry";, "http://talksTo";, 
"http://Bob";),
-                makeRyaStatement("http://Ivan";, "http://talksTo";, 
"http://Bob";));
-
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add( makeBindingSet(
-                new BindingImpl("recruiter", new URIImpl("http://Charlie";)),
-                new BindingImpl("candidate", new URIImpl("http://Eve";)),
-                new BindingImpl("leader", new URIImpl("http://Alice";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("recruiter", new URIImpl("http://David";)),
-                new BindingImpl("candidate", new URIImpl("http://Eve";)),
-                new BindingImpl("leader", new URIImpl("http://Alice";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("recruiter", new URIImpl("http://David";)),
-                new BindingImpl("candidate", new URIImpl("http://Ivan";)),
-                new BindingImpl("leader", new URIImpl("http://Bob";))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+                vf.createStatement(vf.createURI("http://George";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+                vf.createStatement(vf.createURI("http://Harry";), 
vf.createURI("http://talksTo";), vf.createURI("http://Bob";)),
+                vf.createStatement(vf.createURI("http://Ivan";), 
vf.createURI("http://talksTo";), vf.createURI("http://Bob";)));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("recruiter", vf.createURI("http://Charlie";));
+        bs.addBinding("candidate", vf.createURI("http://Eve";));
+        bs.addBinding("leader", vf.createURI("http://Alice";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("recruiter", vf.createURI("http://David";));
+        bs.addBinding("candidate", vf.createURI("http://Eve";));
+        bs.addBinding("leader", vf.createURI("http://Alice";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("recruiter", vf.createURI("http://David";));
+        bs.addBinding("candidate", vf.createURI("http://Ivan";));
+        bs.addBinding("leader", vf.createURI("http://Bob";));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected,  results);
+        runTest(sparql, statements, expectedResults);
     }
 
     @Test
@@ -203,57 +196,52 @@ public class QueryIT extends ITBase {
                   "?worker <http://worksAt> <http://Chipotle>. " +
                 "}";
 
-        // Triples that will be streamed into Fluo after the PCJ has been 
created.
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Bob";),
-                makeRyaStatement("http://Bob";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
-
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Charlie";),
-                makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
-
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://David";),
-                makeRyaStatement("http://David";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";),
-
-                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
-                makeRyaStatement("http://Eve";, "http://livesIn";, 
"http://Leeds";),
-                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
-
-                makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Alice";),
-                makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://London";),
-                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
-
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Bob";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://Charlie";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-        expected.add( makeBindingSet(
-                new BindingImpl("customer", new URIImpl("http://Alice";)),
-                new BindingImpl("worker", new URIImpl("http://David";)),
-                new BindingImpl("city", new URIImpl("http://London";))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Bob";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Charlie";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://David";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://livesIn";), vf.createURI("http://Leeds";)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
+
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
+                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Bob";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://Charlie";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("customer", vf.createURI("http://Alice";));
+        bs.addBinding("worker", vf.createURI("http://David";));
+        bs.addBinding("city", vf.createURI("http://London";));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected,  results);
+        runTest(sparql, statements, expectedResults);
     }
 
     @Test
@@ -266,69 +254,51 @@ public class QueryIT extends ITBase {
                   "?name <http://playsSport> \"Soccer\" " +
                 "}";
 
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://hasAge";, 18),
-                makeRyaStatement("http://Bob";, "http://hasAge";, 30),
-                makeRyaStatement("http://Charlie";, "http://hasAge";, 14),
-                makeRyaStatement("http://David";, "http://hasAge";, 16),
-                makeRyaStatement("http://Eve";, "http://hasAge";, 35),
-
-                makeRyaStatement("http://Alice";, "http://playsSport";, 
"Soccer"),
-                makeRyaStatement("http://Bob";, "http://playsSport";, "Soccer"),
-                makeRyaStatement("http://Charlie";, "http://playsSport";, 
"Basketball"),
-                makeRyaStatement("http://Charlie";, "http://playsSport";, 
"Soccer"),
-                makeRyaStatement("http://David";, "http://playsSport";, 
"Basketball"));
-
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add( makeBindingSet(
-                new BindingImpl("name", new URIImpl("http://Alice";)),
-                new BindingImpl("age", new NumericLiteralImpl(18, 
XMLSchema.INTEGER))));
-        expected.add( makeBindingSet(
-                new BindingImpl("name", new URIImpl("http://Charlie";)),
-                new BindingImpl("age", new NumericLiteralImpl(14, 
XMLSchema.INTEGER))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://hasAge";), vf.createLiteral(18)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://hasAge";), vf.createLiteral(30)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://hasAge";), vf.createLiteral(14)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://hasAge";), vf.createLiteral(16)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://hasAge";), vf.createLiteral(35)),
+
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("name", vf.createURI("http://Alice";));
+        bs.addBinding("age", vf.createLiteral("18", XMLSchema.INTEGER));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("name", vf.createURI("http://Charlie";));
+        bs.addBinding("age", vf.createLiteral("14", XMLSchema.INTEGER));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected,  results);
+        runTest(sparql, statements, expectedResults);
     }
-    
+
     @Test
     public void withCustomFilters() throws Exception {
-        final String sparql = "prefix ryafunc: 
<tag:rya.apache.org,2017:function#> \n" //
-                        + "SELECT ?name ?age \n" //
-                        + "{ \n" //
-                        + "FILTER( ryafunc:isTeen(?age) ) . \n" //
-                        + "?name <http://hasAge> ?age . \n" //
-                        + "?name <http://playsSport> \"Soccer\" \n" //
-                        + "}"; //
-
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice";, "http://hasAge";, 18),
-                makeRyaStatement("http://Bob";, "http://hasAge";, 30),
-                makeRyaStatement("http://Charlie";, "http://hasAge";, 14),
-                makeRyaStatement("http://David";, "http://hasAge";, 16),
-                makeRyaStatement("http://Eve";, "http://hasAge";, 35),
-
-                makeRyaStatement("http://Alice";, "http://playsSport";, 
"Soccer"),
-                makeRyaStatement("http://Bob";, "http://playsSport";, "Soccer"),
-                makeRyaStatement("http://Charlie";, "http://playsSport";, 
"Basketball"),
-                makeRyaStatement("http://Charlie";, "http://playsSport";, 
"Soccer"),
-                makeRyaStatement("http://David";, "http://playsSport";, 
"Basketball"));
-
-        Function fooFunction = new Function() {
+        final String sparql =
+                "prefix ryafunc: <tag:rya.apache.org,2017:function#> " +
+                "SELECT ?name ?age "  +
+                "{ "  +
+                    "FILTER( ryafunc:isTeen(?age) ) . "  +
+                    "?name <http://hasAge> ?age . "  +
+                    "?name <http://playsSport> \"Soccer\" . "  +
+                "}";
 
+        // Register a custom Filter.
+        final Function fooFunction = new Function() {
             @Override
             public String getURI() {
                 return "tag:rya.apache.org,2017:function#isTeen";
@@ -337,24 +307,22 @@ public class QueryIT extends ITBase {
             final static int TEEN_THRESHOLD = 20;
 
             @Override
-            public Value evaluate(ValueFactory valueFactory, Value... args) 
throws ValueExprEvaluationException {
-
+            public Value evaluate(final ValueFactory valueFactory, final 
Value... args) throws ValueExprEvaluationException {
                 if (args.length != 1) {
                     throw new ValueExprEvaluationException("isTeen() requires 
exactly 1 argument, got " + args.length);
                 }
 
                 if (args[0] instanceof Literal) {
-                    Literal literal = (Literal) args[0];
-
-                    URI datatype = literal.getDatatype();
+                    final Literal literal = (Literal) args[0];
+                    final URI datatype = literal.getDatatype();
 
                     // ABS function accepts only numeric literals
                     if (datatype != null && 
XMLDatatypeUtil.isNumericDatatype(datatype)) {
                         if (XMLDatatypeUtil.isDecimalDatatype(datatype)) {
-                            BigDecimal bigValue = literal.decimalValue();
+                            final BigDecimal bigValue = literal.decimalValue();
                             return 
BooleanLiteralImpl.valueOf(bigValue.compareTo(new BigDecimal(TEEN_THRESHOLD)) < 
0);
                         } else if 
(XMLDatatypeUtil.isFloatingPointDatatype(datatype)) {
-                            double doubleValue = literal.doubleValue();
+                            final double doubleValue = literal.doubleValue();
                             return BooleanLiteralImpl.valueOf(doubleValue < 
TEEN_THRESHOLD);
                         } else {
                             throw new ValueExprEvaluationException("unexpected 
datatype (expect decimal/int or floating) for function operand: " + args[0]);
@@ -371,82 +339,134 @@ public class QueryIT extends ITBase {
         // Add our new function to the registry
         FunctionRegistry.getInstance().add(fooFunction);
 
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add( makeBindingSet(
-                new BindingImpl("name", new URIImpl("http://Alice";)),
-                new BindingImpl("age", new NumericLiteralImpl(18, 
XMLSchema.INTEGER))));
-        expected.add( makeBindingSet(
-                new BindingImpl("name", new URIImpl("http://Charlie";)),
-                new BindingImpl("age", new NumericLiteralImpl(14, 
XMLSchema.INTEGER))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://hasAge";), vf.createLiteral(18)),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://hasAge";), vf.createLiteral(30)),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://hasAge";), vf.createLiteral(14)),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://hasAge";), vf.createLiteral(16)),
+                vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://hasAge";), vf.createLiteral(35)),
+
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")),
+                vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+                vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("name", vf.createURI("http://Alice";));
+        bs.addBinding("age", vf.createLiteral("18", XMLSchema.INTEGER));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("name", vf.createURI("http://Charlie";));
+        bs.addBinding("age", vf.createLiteral("14", XMLSchema.INTEGER));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
-        assertEquals(expected,  results);
+        runTest(sparql, statements, expectedResults);
     }
 
     @Test
     public void withTemporal() throws Exception {
+        // A query that finds all stored data after 3 seconds.
         final String dtPredUri = "http://www.w3.org/2006/time#inXSDDateTime";;
         final String dtPred = "<" + dtPredUri + ">";
-        final String xmlDateTime = "http://www.w3.org/2001/XMLSchema#dateTime";;
-        // Find all stored dates.
-        String selectQuery = "PREFIX time: <http://www.w3.org/2006/time#> \n"//
-                        + "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> \n" 
//
-                        + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"//
-                        + "SELECT ?event ?time \n" //
-                        + "WHERE { \n" //
-                        + "  ?event " + dtPred + " ?time . \n"//
-                        // + " FILTER(?time > 
'2000-01-01T01:00:00Z'^^xml:dateTime) \n"// all
-                        // + " FILTER(?time < 
'2007-01-01T01:01:03-08:00'^^xml:dateTime) \n"// after 2007
-                        + " FILTER(?time > 
'2001-01-01T01:01:03-08:00'^^xml:dateTime) \n"// after 3 seconds
-                        + "}";//
-
-        // create some resources and literals to make statements out of
-        String eventz = "<http://eventz>";
-        final Set<RyaStatement> streamedTriples = Sets.newHashSet(//
-                        makeRyaStatement(eventz, 
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type";, 
"<http://www.w3.org/2006/time#Instant>"), //
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2001-01-01T01:01:01-08:00")), // one second
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2001-01-01T04:01:02.000-05:00")), // 2 seconds
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2001-01-01T01:01:03-08:00")), // 3 seconds
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2001-01-01T01:01:04-08:00")), // 4seconds
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2001-01-01T09:01:05Z")), // 5 seconds
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2006-01-01")), //
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2007-01-01")), //
-                        makeRyaStatement(eventz, dtPredUri, new RyaType(new 
URIImpl(xmlDateTime), "2008-01-01")));
-
-        // The expected results of the SPARQL query once the PCJ has been 
computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("event", new 
URIImpl(eventz)), new BindingImpl("time", new 
LiteralImpl("2001-01-01T09:01:04.000Z", new URIImpl(xmlDateTime))))); //
-        expected.add(makeBindingSet(new BindingImpl("event", new 
URIImpl(eventz)), new BindingImpl("time", new 
LiteralImpl("2001-01-01T09:01:05.000Z", new URIImpl(xmlDateTime))))); //
-        expected.add(makeBindingSet(new BindingImpl("event", new 
URIImpl(eventz)), new BindingImpl("time", new 
LiteralImpl("2006-01-01T05:00:00.000Z", new URIImpl(xmlDateTime))))); //
-        expected.add(makeBindingSet(new BindingImpl("event", new 
URIImpl(eventz)), new BindingImpl("time", new 
LiteralImpl("2007-01-01T05:00:00.000Z", new URIImpl(xmlDateTime))))); //
-        expected.add(makeBindingSet(new BindingImpl("event", new 
URIImpl(eventz)), new BindingImpl("time", new 
LiteralImpl("2008-01-01T05:00:00.000Z", new URIImpl(xmlDateTime)))));
-
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(selectQuery);
-
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
-
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String> absent());
+
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time#> " +
+                "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> "  +
+                "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " +
+                "SELECT ?event ?time "  +
+                "WHERE { "  +
+                    "?event " + dtPred + " ?time . " +
+                    "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) 
" +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type";), 
vf.createURI("http://www.w3.org/2006/time#Instant";)),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 
1 second
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))),
 // 2 second
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 
3 seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 
4 seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 
seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z"))));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("event", vf.createURI("http://eventz";));
+        bs.addBinding("time", 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:04.000Z")));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("event", vf.createURI("http://eventz";));
+        bs.addBinding("time", 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05.000Z")));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("event", vf.createURI("http://eventz";));
+        bs.addBinding("time", 
vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z")));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("event", vf.createURI("http://eventz";));
+        bs.addBinding("time", 
vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z")));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("event", vf.createURI("http://eventz";));
+        bs.addBinding("time", 
vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z")));
+        expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
selectQuery);
-        assertEquals(expected, results);
+        runTest(sparql, statements, expectedResults);
+    }
+
+    public void runTest(final String sparql, final Collection<Statement> 
statements, final Collection<BindingSet> expectedResults) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+        requireNonNull(expectedResults);
+
+        // Register the PCJ with Rya.
+        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
+                ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(),
+                accInstance.getInstanceName(),
+                accInstance.getZooKeepers()), accumuloConn);
+
+        ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
+
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
+
+        // Fetch the value that is stored within the PCJ table.
+        try(final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+            final String pcjId = pcjStorage.listPcjs().get(0);
+            final Set<BindingSet> results = Sets.newHashSet( 
pcjStorage.listResults(pcjId) );
+
+            // Ensure the result of the query matches the expected result.
+            assertEquals(expectedResults, results);
+        }
     }
-}
+}
\ No newline at end of file

Reply via email to