http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
new file mode 100644
index 0000000..a772b83
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.demo;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.openrdf.model.Statement;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.mini.MiniFluo;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * Demonstrates historicly added Rya statements that are stored within the core
+ * Rya tables joining with newly streamed statements into the Fluo application.
+ */
+public class FluoAndHistoricPcjsDemo implements Demo {
+    private static final Logger log = 
Logger.getLogger(FluoAndHistoricPcjsDemo.class);
+
+    // Employees
+    private static final RyaURI alice = new RyaURI("http://Alice";);
+    private static final RyaURI bob = new RyaURI("http://Bob";);
+    private static final RyaURI charlie = new RyaURI("http://Charlie";);
+    private static final RyaURI frank = new RyaURI("http://Frank";);
+
+    // Patrons
+    private static final RyaURI david = new RyaURI("http://David";);
+    private static final RyaURI eve = new RyaURI("http://Eve";);
+    private static final RyaURI george = new RyaURI("http://George";);
+
+    // Other People
+    private static final RyaURI henry = new RyaURI("http://Henry";);
+    private static final RyaURI irene = new RyaURI("http://Irene";);
+    private static final RyaURI justin = new RyaURI("http://Justin";);
+    private static final RyaURI kristi = new RyaURI("http://Kristi";);
+    private static final RyaURI luke = new RyaURI("http://Luke";);
+    private static final RyaURI manny = new RyaURI("http://Manny";);
+    private static final RyaURI nate = new RyaURI("http://Nate";);
+    private static final RyaURI olivia = new RyaURI("http://Olivia";);
+    private static final RyaURI paul = new RyaURI("http://Paul";);
+    private static final RyaURI ross = new RyaURI("http://Ross";);
+    private static final RyaURI sally = new RyaURI("http://Sally";);
+    private static final RyaURI tim = new RyaURI("http://Tim";);
+
+    // Places
+    private static final RyaURI coffeeShop = new RyaURI("http://CoffeeShop";);
+    private static final RyaURI burgerShop = new RyaURI("http://BurgerShop";);
+    private static final RyaURI cupcakeShop= new RyaURI("http://cupcakeShop";);
+
+    // Verbs
+    private static final RyaURI talksTo = new RyaURI("http://talksTo";);
+    private static final RyaURI worksAt = new RyaURI("http://worksAt";);
+
+    /**
+     * Used to pause the demo waiting for the presenter to hit the Enter key.
+     */
+    private final java.util.Scanner keyboard = new 
java.util.Scanner(System.in);
+
+    @Override
+    public void execute(
+            final MiniAccumuloCluster accumulo,
+            final Connector accumuloConn,
+            final String ryaTablePrefix,
+            final RyaSailRepository ryaRepo,
+            final RepositoryConnection ryaConn,
+            final MiniFluo fluo,
+            final FluoClient fluoClient) throws DemoExecutionException {
+        log.setLevel(Level.INFO);
+
+        // 1. Introduce some RDF Statements that we are going to start with and
+        //    pause so the presenter can introduce this information to the 
audience.
+        final Set<RyaStatement> relevantHistoricStatements = Sets.newHashSet(
+                new RyaStatement(eve, talksTo, charlie),
+                new RyaStatement(david, talksTo, alice),
+                new RyaStatement(alice, worksAt, coffeeShop),
+                new RyaStatement(bob, worksAt, coffeeShop));
+
+        log.info("We add some Statements that are relevant to the query we 
will compute:");
+        prettyLogStatements(relevantHistoricStatements);
+        waitForEnter();
+
+        log.info("We also some more Satements that aren't realted to the query 
we will compute");
+        final Set<RyaStatement> otherHistoricStatements = Sets.newHashSet(
+                new RyaStatement(henry, worksAt, burgerShop),
+                new RyaStatement(irene, worksAt, burgerShop),
+                new RyaStatement(justin, worksAt, burgerShop),
+                new RyaStatement(kristi, worksAt, burgerShop),
+                new RyaStatement(luke, worksAt, burgerShop),
+                new RyaStatement(manny, worksAt, cupcakeShop),
+                new RyaStatement(nate, worksAt, cupcakeShop),
+                new RyaStatement(olivia, worksAt, cupcakeShop),
+                new RyaStatement(paul, worksAt, cupcakeShop),
+                new RyaStatement(ross, worksAt, cupcakeShop),
+                new RyaStatement(henry, talksTo, irene),
+                new RyaStatement(henry, talksTo, justin),
+                new RyaStatement(kristi, talksTo, irene),
+                new RyaStatement(luke, talksTo, irene),
+                new RyaStatement(sally, talksTo, paul),
+                new RyaStatement(sally, talksTo, ross),
+                new RyaStatement(sally, talksTo, kristi),
+                new RyaStatement(tim, talksTo, nate),
+                new RyaStatement(tim, talksTo, paul),
+                new RyaStatement(tim, talksTo, kristi));
+
+        log.info("Theese statements will also be inserted into the core Rya 
tables:");
+        prettyLogStatements(otherHistoricStatements);
+        waitForEnter();
+
+        // 2. Load the statements into the core Rya tables.
+        log.info("Loading the historic statements into Rya...");
+        loadDataIntoRya(ryaConn, relevantHistoricStatements);
+        loadDataIntoRya(ryaConn, otherHistoricStatements);
+        log.info("");
+
+        // 3. Introduce the query that we're going to load into Fluo and pause 
so that the
+        //    presenter may show what they believe the expected output should 
be.
+        final String sparql =
+                "SELECT ?patron ?employee " +
+                 "WHERE { " +
+                     "?patron <http://talksTo> ?employee. " +
+                     "?employee <http://worksAt> <http://CoffeeShop>. " +
+                 "}";
+
+        log.info("The following SPARQL query will be loaded into the Fluo 
application for incremental updates:");
+        prettyLogSparql(sparql);
+        waitForEnter();
+
+        // 4. Write the query to Fluo and import the historic matches. Wait 
for the app to finish exporting results.
+        log.info("Telling Fluo to maintain the query and import the historic 
Statement Pattern matches.");
+        try {
+            new CreatePcj().withRyaIntegration(fluoClient, ryaTablePrefix, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+        } catch (MalformedQueryException | SailException | 
QueryEvaluationException | PcjException e) {
+            throw new DemoExecutionException("Error while using Fluo to 
compute and export historic matches, so the demo can not continue. Exiting.", 
e);
+        }
+
+        log.info("Waiting for the fluo application to finish exporting the 
initial results...");
+        fluo.waitForObservers();
+        log.info("Historic result exporting finished.");
+        log.info("");
+
+        // 5. Show that the Fluo app exported the results to the PCJ table in 
Accumulo.
+        final String pcjTableName = getPcjTableName(fluoClient, sparql);
+
+        log.info("The following Binding Sets were exported to the '" + 
pcjTableName+ "' table in Accumulo:");
+        Multimap<String, BindingSet> pcjResults = loadPcjResults(accumuloConn, 
pcjTableName);
+        prettyLogPcjResults(pcjResults);
+        waitForEnter();
+
+        // 6. Introduce some new Statements that we will stream into the Fluo 
app.
+        final RyaStatement newLeft = new RyaStatement(george, talksTo, frank);
+        final RyaStatement newRight = new RyaStatement(frank, worksAt, 
coffeeShop);
+        final RyaStatement joinLeft = new RyaStatement(eve, talksTo, bob);
+        final RyaStatement joinRight = new RyaStatement(charlie, worksAt, 
coffeeShop);
+
+        final Set<RyaStatement> relevantstreamedStatements = Sets.newHashSet(
+                newLeft,
+                newRight,
+                joinLeft,
+                joinRight);
+
+        log.info("We stream these relevant Statements into Fluo and the core 
Rya tables:");
+        log.info(prettyFormat(newLeft) + "          - Part of a new result");
+        log.info(prettyFormat(newRight) + "      - Other part of a new 
result");
+        log.info(prettyFormat(joinLeft) + "               - Joins with a 
historic <http://talksTo> statement");
+        log.info(prettyFormat(joinRight) + "    - Joins with a historic 
<http://worksA>t statement");
+        waitForEnter();
+
+        final Set<RyaStatement> otherStreamedStatements = Sets.newHashSet(
+                new RyaStatement(alice, talksTo, tim),
+                new RyaStatement(bob, talksTo, tim),
+                new RyaStatement(charlie, talksTo, tim),
+                new RyaStatement(frank, talksTo, tim),
+                new RyaStatement(david, talksTo, tim),
+                new RyaStatement(eve, talksTo, sally),
+                new RyaStatement(george, talksTo, sally),
+                new RyaStatement(henry, talksTo, sally),
+                new RyaStatement(irene, talksTo, sally),
+                new RyaStatement(justin, talksTo, sally),
+                new RyaStatement(kristi, talksTo, manny),
+                new RyaStatement(luke, talksTo, manny),
+                new RyaStatement(manny, talksTo, paul),
+                new RyaStatement(nate, talksTo, manny),
+                new RyaStatement(olivia, talksTo, manny),
+                new RyaStatement(paul, talksTo, kristi),
+                new RyaStatement(ross, talksTo, kristi),
+                new RyaStatement(sally, talksTo, kristi),
+                new RyaStatement(olivia, talksTo, kristi),
+                new RyaStatement(olivia, talksTo, kristi));
+
+        log.info("We also stream these irrelevant Statements into Fluo and the 
core Rya tables:");
+        prettyLogStatements(otherStreamedStatements);
+        waitForEnter();
+
+        // 7. Insert the new triples into the core Rya tables and the Fluo app.
+        loadDataIntoRya(ryaConn, relevantstreamedStatements);
+        loadDataIntoFluo(fluoClient, relevantstreamedStatements);
+
+        log.info("Waiting for the fluo application to finish exporting the 
newly streamed results...");
+        fluo.waitForObservers();
+        log.info("Streamed result exporting finished.");
+        log.info("");
+
+        // 8. Show the new results have been exported to the PCJ table in 
Accumulo.
+        log.info("The following Binding Sets were expolrted to the '" + 
pcjTableName+ "' table in Accumulo:");
+        pcjResults = loadPcjResults(accumuloConn, pcjTableName);
+        prettyLogPcjResults(pcjResults);
+        log.info("");
+    }
+
+    private void waitForEnter() {
+        log.info("");
+        log.info("Press [Enter] to continue the demo.");
+        keyboard.nextLine();
+    }
+
+    private static void prettyLogSparql(final String sparql) {
+        try {
+            // Pretty print.
+            final String[] lines = prettyFormatSparql(sparql);
+            for(final String line : lines) {
+                log.info(line);
+            }
+        } catch (final Exception e) {
+            // Pretty print failed, so ugly print instead.
+            log.info(sparql);
+        }
+    }
+
+    private static void loadDataIntoFluo(final FluoClient fluoClient, final 
Set<RyaStatement> statements) {
+        final InsertTriples insertTriples = new InsertTriples();
+        for(final RyaStatement statement : statements) {
+            insertTriples.insert(fluoClient, statement);
+        }
+    }
+
+    private static String prettyFormat(final RyaStatement statement) {
+        final RyaURI s = statement.getSubject();
+        final RyaURI p = statement.getPredicate();
+        final RyaType o = statement.getObject();
+        return "<" + s.getData() + "> <"+ p.getData() + "> <" + o.getData() + 
">";
+    }
+
+    private static void prettyLogStatements(final Set<RyaStatement> 
statements) {
+        for(final RyaStatement statement : statements) {
+            log.info("    " + prettyFormat(statement));
+        }
+    }
+
+    private static String[] prettyFormatSparql(final String sparql) throws 
Exception {
+        final SPARQLParser parser = new SPARQLParser();
+        final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer();
+        final ParsedQuery pq = parser.parseQuery(sparql, null);
+        final String prettySparql = renderer.render(pq);
+        return StringUtils.split(prettySparql, '\n');
+    }
+
+    private static void loadDataIntoRya(final RepositoryConnection ryaConn, 
final Set<RyaStatement> statements) throws DemoExecutionException {
+        for(final RyaStatement ryaStatement : statements) {
+            final Statement statement = 
RyaToRdfConversions.convertStatement(ryaStatement);
+            try {
+                ryaConn.add(statement);
+            } catch (final RepositoryException e) {
+                throw new DemoExecutionException("Could not load one of the 
historic statements into Rya, so the demo can not continue. Exiting.", e);
+            }
+        }
+    }
+
+    private static String getPcjTableName(final FluoClient fluoClient, final 
String sparql) {
+        try(Snapshot snap = fluoClient.newSnapshot()) {
+            final Bytes queryId = snap.get(Bytes.of(sparql), 
FluoQueryColumns.QUERY_ID);
+            return snap.get(queryId, 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString();
+        }
+    }
+
+    /**
+     * Scan accumulo for the results that are stored in a PCJ tablle. The
+     * multimap stores a set of deserialized binding sets that were in the PCJ
+     * table for every variable order that is found in the PCJ metadata.
+     */
+    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName) throws DemoExecutionException {
+        final Multimap<String, BindingSet> fetchedResults = 
HashMultimap.create();
+
+        try {
+            // Get the variable orders the data was written to.
+            final PcjTables pcjs = new PcjTables();
+            final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
+
+            // Scan Accumulo for the stored results.
+            for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
+                final Scanner scanner = 
accumuloConn.createScanner(pcjTableName, new Authorizations());
+                scanner.fetchColumnFamily( new Text(varOrder.toString()) );
+
+                for(final Entry<Key, Value> entry : scanner) {
+                    final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
+                    final BindingSet result = 
AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+                    fetchedResults.put(varOrder.toString(), result);
+                }
+            }
+        } catch(PcjException | TableNotFoundException | 
RyaTypeResolverException e) {
+            throw new DemoExecutionException("Couldn't fetch the binding sets 
that were exported to the PCJ table, so the demo can not continue. Exiting.", 
e);
+        }
+
+        return fetchedResults;
+    }
+
+    private static void prettyLogPcjResults(final Multimap<String, BindingSet> 
pcjResults) throws DemoExecutionException {
+        final String varOrderString = pcjResults.keySet().iterator().next();
+        final Collection<BindingSet> reuslts = pcjResults.get(varOrderString);
+        for(final BindingSet result : reuslts) {
+            log.info("    " + result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
new file mode 100644
index 0000000..6ef282f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project 
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.pcj.fluo.parent</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.pcj.fluo.integration</artifactId>
+    
+    <name>Apache Rya PCJ Fluo Integration Tests</name>
+    <description>Integration tests for the Rya Fluo application.</description>
+    
+    <dependencies>
+        <!-- Rya Runtime Dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.client</artifactId>
+        </dependency>
+    
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>io.fluo</groupId>
+            <artifactId>fluo-mini</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
new file mode 100644
index 0000000..c9af9aa
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.openrdf.model.Statement;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.io.Files;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ObserverConfiguration;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.mini.MiniFluo;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * Integration tests that ensure the Fluo application processes PCJs results 
correctly.
+ * <p>
+ * This class is being ignored because it doesn't contain any unit tests.
+ */
+public abstract class ITBase {
+    private static final Logger log = Logger.getLogger(ITBase.class);
+
+    protected static final String RYA_TABLE_PREFIX = "demo_";
+
+    // Rya data store and connections.
+    protected MiniAccumuloCluster accumulo = null;
+    protected static Connector accumuloConn = null;
+    protected RyaSailRepository ryaRepo = null;
+    protected RepositoryConnection ryaConn = null;
+
+    // Fluo data store and connections.
+    protected MiniFluo fluo = null;
+    protected FluoClient fluoClient = null;
+
+    @Before
+    public void setupMiniResources() throws IOException, InterruptedException, 
AccumuloException, AccumuloSecurityException, RepositoryException {
+        // Initialize the Mini Accumulo that will be used to store Triples and 
get a connection to it.
+        accumulo = startMiniAccumulo();
+
+        // Setup the Rya library to use the Mini Accumulo.
+        ryaRepo = setupRya(accumulo);
+        ryaConn = ryaRepo.getConnection();
+
+        // Initialize the Mini Fluo that will be used to store created queries.
+        fluo = startMiniFluo();
+        fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() );
+    }
+
+    @After
+    public void shutdownMiniResources() {
+        if(ryaConn != null) {
+            try {
+                log.info("Shutting down Rya Connection.");
+                ryaConn.close();
+                log.info("Rya Connection shut down.");
+            } catch(final Exception e) {
+                log.error("Could not shut down the Rya Connection.", e);
+            }
+        }
+
+        if(ryaRepo != null) {
+            try {
+                log.info("Shutting down Rya Repo.");
+                ryaRepo.shutDown();
+                log.info("Rya Repo shut down.");
+            } catch(final Exception e) {
+                log.error("Could not shut down the Rya Repo.", e);
+            }
+        }
+
+        if(accumulo != null) {
+            try {
+                log.info("Shutting down the Mini Accumulo being used as a Rya 
store.");
+                accumulo.stop();
+                log.info("Mini Accumulo being used as a Rya store shut down.");
+            } catch(final Exception e) {
+                log.error("Could not shut down the Mini Accumulo.", e);
+            }
+        }
+
+        if(fluoClient != null) {
+            try {
+                log.info("Shutting down Fluo Client.");
+                fluoClient.close();
+                log.info("Fluo Client shut down.");
+            } catch(final Exception e) {
+                log.error("Could not shut down the Fluo Client.", e);
+            }
+        }
+
+        if(fluo != null) {
+            try {
+                log.info("Shutting down Mini Fluo.");
+                fluo.close();
+                log.info("Mini Fluo shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Mini Fluo.", e);
+            }
+        }
+    }
+
+    /**
+     * A helper fuction for creating a {@link BindingSet} from an array of 
{@link Binding}s.
+     *
+     * @param bindings - The bindings to include in the set. (not null)
+     * @return A {@link BindingSet} holding the bindings.
+     */
+    protected static BindingSet makeBindingSet(final Binding... bindings) {
+        final MapBindingSet bindingSet = new MapBindingSet();
+        for(final Binding binding : bindings) {
+            bindingSet.addBinding(binding);
+        }
+        return bindingSet;
+    }
+
+    /**
+     * A helper function for creating a {@link RyaStatement} that represents a 
Triple.
+     *
+     * @param subject - The Subject of the Triple. (not null)
+     * @param predicate - The Predicate of the Triple. (not null)
+     * @param object - The Object of the Triple. (not null)
+     * @return A Triple as a {@link RyaStatement}.
+     */
+    protected static RyaStatement makeRyaStatement(final String subject, final 
String predicate, final String object) {
+        checkNotNull(subject);
+        checkNotNull(predicate);
+        checkNotNull(object);
+
+        final RyaStatementBuilder builder = RyaStatement.builder()
+            .setSubject( new RyaURI(subject) )
+            .setPredicate( new RyaURI(predicate) );
+
+        if(object.startsWith("http://";)) {
+            builder.setObject(new RyaURI(object) );
+        } else {
+            builder.setObject( new RyaType(object) );
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * A helper function for creating a {@link RyaStatement} that represents a 
Triple.
+     *
+     * @param subject - The Subject of the Triple. (not null)
+     * @param predicate - The Predicate of the Triple. (not null)
+     * @param object - The Object of the Triple. (not null)
+     * @return A Triple as a {@link RyaStatement}.
+     */
+    protected static RyaStatement makeRyaStatement(final String subject, final 
String predicate, final int object) {
+        checkNotNull(subject);
+        checkNotNull(predicate);
+
+        return RyaStatement.builder()
+                .setSubject(new RyaURI(subject))
+                .setPredicate(new RyaURI(predicate))
+                .setObject( new RyaType(XMLSchema.INT, "" + object) )
+                .build();
+    }
+
+    /**
+     * A helper function for creating a Sesame {@link Statement} that 
represents a Triple..
+     *
+     * @param subject - The Subject of the Triple. (not null)
+     * @param predicate - The Predicate of the Triple. (not null)
+     * @param object - The Object of the Triple. (not null)
+     * @return A Triple as a {@link Statement}.
+     */
+    protected static Statement makeStatement(final String subject, final 
String predicate, final String object) {
+        checkNotNull(subject);
+        checkNotNull(predicate);
+        checkNotNull(object);
+
+        final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, 
object);
+        return RyaToRdfConversions.convertStatement(ryaStmt);
+    }
+
+    /**
+     * Fetches the binding sets that are the results of a specific SPARQL query
+     * from the Fluo table.
+     *
+     * @param fluoClient- A connection to the Fluo table where the results 
reside. (not null)
+     * @param sparql - This query's results will be fetched. (not null)
+     * @return The binding sets for the query's results.
+     */
+    protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient 
fluoClient, final String sparql) {
+        final Set<BindingSet> bindingSets = new HashSet<>();
+
+        try(Snapshot snapshot = fluoClient.newSnapshot()) {
+            final String queryId = snapshot.get(Bytes.of(sparql), 
FluoQueryColumns.QUERY_ID).toString();
+
+            // Fetch the query's variable order.
+            final QueryMetadata queryMetadata = new 
FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
+            final String[] varOrder = 
queryMetadata.getVariableOrder().toArray();
+
+            // Fetch the Binding Sets for the query.
+            final ScannerConfiguration scanConfig = new ScannerConfiguration();
+            
scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), 
FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
+
+            final RowIterator rowIter = snapshot.get(scanConfig);
+            while(rowIter.hasNext()) {
+                final Entry<Bytes, ColumnIterator> row = rowIter.next();
+                final String bindingSetString = 
row.getValue().next().getValue().toString();
+                final BindingSet bindingSet = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+                bindingSets.add(bindingSet);
+            }
+        }
+
+        return bindingSets;
+    }
+
+    /**
+     * Setup a Mini Accumulo cluster that uses a temporary directory to store 
its data.
+     *
+     * @return A Mini Accumulo cluster.
+     */
+    private static MiniAccumuloCluster startMiniAccumulo() throws IOException, 
InterruptedException, AccumuloException, AccumuloSecurityException {
+        final File miniDataDir = Files.createTempDir();
+
+        // Setup and start the Mini Accumulo.
+        final MiniAccumuloCluster accumulo = new 
MiniAccumuloCluster(miniDataDir, "password");
+        accumulo.start();
+
+        // Store a connector to the Mini Accumulo.
+        final Instance instance = new 
ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
+        accumuloConn = instance.getConnector("root", new 
PasswordToken("password"));
+
+        return accumulo;
+    }
+
+    /**
+     * Format a Mini Accumulo to be a Rya repository.
+     *
+     * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. 
(not null)
+     * @return The Rya repository sitting on top of the Mini Accumulo.
+     */
+    private static RyaSailRepository setupRya(final MiniAccumuloCluster 
accumulo) throws AccumuloException, AccumuloSecurityException, 
RepositoryException {
+        checkNotNull(accumulo);
+
+        // Setup the Rya Repository that will be used to create Repository 
Connections.
+        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+        final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
+        crdfdao.setConnector(accumuloConn);
+
+        // Setup Rya configuration values.
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("demo_");
+        conf.setDisplayQueryPlan(true);
+
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
RYA_TABLE_PREFIX);
+        conf.set(ConfigUtils.CLOUDBASE_USER, "root");
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "password");
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName());
+
+        crdfdao.setConf(conf);
+        ryaStore.setRyaDAO(crdfdao);
+
+        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+        ryaRepo.initialize();
+
+        return ryaRepo;
+    }
+
+    /**
+     * Override this method to provide an output configuration to the Fluo 
application.
+     * <p>
+     * Returns an empty map by default.
+     *
+     * @return The parameters that will be passed to {@link 
QueryResultObserver} at startup.
+     */
+    protected Map<String, String> makeExportParams() {
+        return new HashMap<>();
+    }
+
+    /**
+     * Setup a Mini Fluo cluster that uses a temporary directory to store its 
data.ll
+     *
+     * @return A Mini Fluo cluster.
+     */
+    protected MiniFluo startMiniFluo() {
+        final File miniDataDir = Files.createTempDir();
+
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverConfiguration> observers = new ArrayList<>();
+        observers.add(new 
ObserverConfiguration(TripleObserver.class.getName()));
+        observers.add(new 
ObserverConfiguration(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
+        observers.add(new 
ObserverConfiguration(FilterObserver.class.getName()));
+
+        // Provide export parameters child test classes may provide to the 
export observer.
+        final ObserverConfiguration exportObserverConfig = new 
ObserverConfiguration(QueryResultObserver.class.getName());
+        exportObserverConfig.setParameters( makeExportParams() );
+        observers.add(exportObserverConfig);
+
+        // Configure how the mini fluo will run.
+        final FluoConfiguration config = new FluoConfiguration();
+        config.setApplicationName("IntegrationTests");
+        config.setMiniDataDir(miniDataDir.getAbsolutePath());
+        config.addObservers(observers);
+
+        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
+        return miniFluo;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
new file mode 100644
index 0000000..41c4f08
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ObserverConfiguration;
+import io.fluo.api.mini.MiniFluo;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+
+/**
+ * Tests the methods of {@link CountStatements}.
+ */
+public class CountStatementsIT extends ITBase {
+
+    /**
+     * Overriden so that no Observers will be started. This ensures whatever
+     * statements are inserted as part of the test will not be consumed.
+     *
+     * @return A Mini Fluo cluster.
+     */
+    @Override
+    protected MiniFluo startMiniFluo() {
+        final File miniDataDir = Files.createTempDir();
+
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverConfiguration> observers = new ArrayList<>();
+
+        // Configure how the mini fluo will run.
+        final FluoConfiguration config = new FluoConfiguration();
+        config.setApplicationName("IntegrationTests");
+        config.setMiniDataDir(miniDataDir.getAbsolutePath());
+        config.addObservers(observers);
+
+        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
+        return miniFluo;
+    }
+
+    @Test
+    public void test() {
+        // Insert some Triples into the Fluo app.
+        final List<RyaStatement> triples = new ArrayList<>();
+        triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://Alice";)).setPredicate(new 
RyaURI("http://talksTo";)).setObject(new RyaURI("http://Bob";)).build() );
+        triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://Bob";)).setPredicate(new RyaURI("http://talksTo";)).setObject(new 
RyaURI("http://Alice";)).build() );
+        triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://Charlie";)).setPredicate(new 
RyaURI("http://talksTo";)).setObject(new RyaURI("http://Bob";)).build() );
+        triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://David";)).setPredicate(new 
RyaURI("http://talksTo";)).setObject(new RyaURI("http://Bob";)).build() );
+        triples.add( RyaStatement.builder().setSubject(new 
RyaURI("http://Eve";)).setPredicate(new RyaURI("http://talksTo";)).setObject(new 
RyaURI("http://Bob";)).build() );
+
+        new InsertTriples().insert(fluoClient, triples);
+
+        // Load some statements into the Fluo app.
+        final BigInteger count = new 
CountStatements().countStatements(fluoClient);
+
+        // Ensure the count matches the expected values.
+        assertEquals(BigInteger.valueOf(5), count);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
new file mode 100644
index 0000000..c4b00e7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import 
org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
+import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.collect.Sets;
+
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Integration tests the methods of {@link GetPcjMetadata}.
+ */
+public class GetPcjMetadataIT extends ITBase {
+
+    @Test
+    public void getMetadataByQueryId() throws AccumuloException, 
AccumuloSecurityException, TableExistsException, PcjException, 
NotInFluoException, NotInAccumuloException, MalformedQueryException, 
SailException, QueryEvaluationException {
+        final String sparql =
+                "SELECT ?x " +
+                  "WHERE { " +
+                  "?x <http://talksTo> <http://Eve>. " +
+                  "?x <http://worksAt> <http://Chipotle>." +
+                "}";
+        final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( 
new VariableOrder("x") );
+
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, varOrders, sparql);
+
+        // Ensure the command returns the correct metadata.
+        final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
+
+        final String queryId = new 
ListQueryIds().listQueryIds(fluoClient).get(0);
+        final PcjMetadata metadata = new 
GetPcjMetadata().getMetadata(accumuloConn, fluoClient, queryId);
+
+        assertEquals(expected, metadata);
+    }
+
+    @Test
+    public void getAllMetadata() throws MalformedQueryException, 
SailException, QueryEvaluationException, PcjException, NotInFluoException, 
NotInAccumuloException {
+        final CreatePcj createPcj = new CreatePcj();
+
+        // Add a couple of queries to Accumulo.
+        final String q1Sparql =
+                "SELECT ?x " +
+                  "WHERE { " +
+                  "?x <http://talksTo> <http://Eve>. " +
+                  "?x <http://worksAt> <http://Chipotle>." +
+                "}";
+        final Set<VariableOrder> q1VarOrders = Sets.<VariableOrder>newHashSet( 
new VariableOrder("x") );
+        createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, 
accumuloConn, q1VarOrders, q1Sparql);
+
+        final String q2Sparql =
+                "SELECT ?x ?y " +
+                  "WHERE { " +
+                  "?x <http://talksTo> ?y. " +
+                  "?y <http://worksAt> <http://Chipotle>." +
+                "}";
+        final Set<VariableOrder> q2VarOrders = Sets.<VariableOrder>newHashSet( 
new VariableOrder("x", "y") );
+        createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, 
accumuloConn, q2VarOrders, q2Sparql);
+
+        // Ensure the command returns the correct metadata.
+        final Set<PcjMetadata> expected = new HashSet<>();
+        expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
+        expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
+
+        final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(accumuloConn, fluoClient);
+        assertEquals(expected, Sets.newHashSet( metadata.values() ));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
new file mode 100644
index 0000000..157412a
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Integration tests the methods of {@link GetQueryReportl}.
+ */
+public class GetQueryReportIT extends ITBase {
+
+    @Test
+    public void getReport() throws Exception {
+        final String sparql =
+                "SELECT ?worker ?company ?city" +
+                "{ " +
+                  "FILTER(?worker = <http://Alice>) " +
+                  "?worker <http://worksAt> ?company . " +
+                  "?worker <http://livesIn> ?city ." +
+                "}";
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Taco Shop"),
+                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Burger Join"),
+                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Pastery Shop"),
+                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://Alice";, "http://livesIn";, 
"http://Lost County"),
+                makeRyaStatement("http://Alice";, "http://livesIn";, "http://Big 
City"),
+                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://Bob";, "http://livesIn";, "http://Big 
City"),
+                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://Charlie";, "http://livesIn";, 
"http://Big City"),
+                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://David";, "http://livesIn";, 
"http://Lost County"),
+                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://Eve";, "http://livesIn";, "http://Big 
City"),
+                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Burrito Place"),
+                makeRyaStatement("http://Frank";, "http://livesIn";, 
"http://Lost County"));
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Stream the data into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples);
+
+        // Wait for the results to finish processing.
+        fluo.waitForObservers();
+
+        // Fetch the report.
+        final Map<String, PcjMetadata> metadata = new 
GetPcjMetadata().getMetadata(accumuloConn, fluoClient);
+        final Set<String> queryIds = metadata.keySet();
+        assertEquals(1, queryIds.size());
+        final String queryId = queryIds.iterator().next();
+
+        final QueryReport report = new GetQueryReport().getReport(fluoClient, 
queryId);
+
+        // Build the expected counts map.
+        final Map<String, BigInteger> expectedCounts = new HashMap<>();
+
+        final FluoQuery fluoQuery = report.getFluoQuery();
+
+        final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
+        expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
+
+        final String filterNodeId = 
fluoQuery.getFilterMetadata().iterator().next().getNodeId();
+        expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
+
+        final String joinNodeId = 
fluoQuery.getJoinMetadata().iterator().next().getNodeId();
+        expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
+
+        final Iterator<StatementPatternMetadata> patterns = 
fluoQuery.getStatementPatternMetadata().iterator();
+        final StatementPatternMetadata sp1 = patterns.next();
+        final StatementPatternMetadata sp2 = patterns.next();
+        if(sp1.getStatementPattern().contains("http://worksAt";)) {
+            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
+            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
+        } else {
+            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
+            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
+        }
+
+        assertEquals(expectedCounts, report.getCounts());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
new file mode 100644
index 0000000..d18a497
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_ID;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+
+import io.fluo.api.types.TypedTransaction;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory;
+
+/**
+ * Integration tests the methods of {@link ListQueryIds}.
+ */
+public class ListQueryIdsIT extends ITBase {
+
+    private static final PcjTableNameFactory tableNameFactory = new 
PcjTableNameFactory();
+
+    /**
+     * This test ensures that when there are PCJ tables in Accumulo as well as
+     * the Fluo table's export destinations column, the command for fetching 
the
+     * list of queries only includes queries that appear in both places.
+     */
+    @Test
+    public void getQueryIds() throws AccumuloException, 
AccumuloSecurityException, TableExistsException {
+        // Store a few SPARQL/Query ID pairs in the Fluo table.
+        try(TypedTransaction tx = new StringTypeLayer().wrap( 
fluoClient.newTransaction() )) {
+            tx.mutate().row("SPARQL_3").col(QUERY_ID).set("ID_3");
+            tx.mutate().row("SPARQL_1").col(QUERY_ID).set("ID_1");
+            tx.mutate().row("SPARQL_4").col(QUERY_ID).set("ID_4");
+            tx.mutate().row("SPARQL_2").col(QUERY_ID).set("ID_2");
+            tx.commit();
+        }
+
+        // Ensure the correct list of Query IDs is retured.
+        final List<String> expected = Lists.newArrayList("ID_1", "ID_2", 
"ID_3", "ID_4");
+        final List<String> queryIds = new 
ListQueryIds().listQueryIds(fluoClient);
+        assertEquals(expected, queryIds);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
new file mode 100644
index 0000000..231c8f1
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.client.Transaction;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Integration tests the methods of {@link FluoQueryMetadataDAO}.
+ */
+public class FluoQueryMetadataDAOIT extends ITBase {
+
+    @Test
+    public void statementPatternMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final StatementPatternMetadata.Builder builder = 
StatementPatternMetadata.builder("nodeId");
+        builder.setVarOrder(new VariableOrder("a;b;c"));
+        builder.setStatementPattern("statementPattern");
+        builder.setParentNodeId("parentNodeId");
+        final StatementPatternMetadata originalMetadata = builder.build();
+
+        // Write it to the Fluo table.
+        try(Transaction tx = fluoClient.newTransaction()) {
+            dao.write(tx, originalMetadata);
+            tx.commit();
+        }
+
+        // Read it from the Fluo table.
+        StatementPatternMetadata storedMetadata = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
+        }
+
+        // Ensure the deserialized object is the same as the serialized one.
+        assertEquals(originalMetadata, storedMetadata);
+    }
+
+    @Test
+    public void filterMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final FilterMetadata.Builder builder = 
FilterMetadata.builder("nodeId");
+        builder.setVarOrder(new VariableOrder("e;f"));
+        builder.setParentNodeId("parentNodeId");
+        builder.setChildNodeId("childNodeId");
+        builder.setOriginalSparql("originalSparql");
+        builder.setFilterIndexWithinSparql(2);
+        final FilterMetadata originalMetadata = builder.build();
+
+        // Write it to the Fluo table.
+        try(Transaction tx = fluoClient.newTransaction()) {
+            dao.write(tx, originalMetadata);
+            tx.commit();
+        }
+
+        // Read it from the Fluo table.
+        FilterMetadata storedMetadata = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedMetadata = dao.readFilterMetadata(sx, "nodeId");
+        }
+
+        // Ensure the deserialized object is the same as the serialized one.
+        assertEquals(originalMetadata, storedMetadata);
+    }
+
+    @Test
+    public void joinMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId");
+        builder.setVariableOrder(new VariableOrder("g;y;s"));
+        builder.setParentNodeId("parentNodeId");
+        builder.setLeftChildNodeId("leftChildNodeId");
+        builder.setRightChildNodeId("rightChildNodeId");
+        final JoinMetadata originalMetadata = builder.build();
+
+        // Write it to the Fluo table.
+        try(Transaction tx = fluoClient.newTransaction()) {
+            dao.write(tx, originalMetadata);
+            tx.commit();
+        }
+
+        // Read it from the Fluo table.
+        JoinMetadata storedMetadata = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedMetadata = dao.readJoinMetadata(sx, "nodeId");
+        }
+
+        // Ensure the deserialized object is the same as the serialized one.
+        assertEquals(originalMetadata, storedMetadata);
+    }
+
+    @Test
+    public void queryMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final QueryMetadata.Builder builder = QueryMetadata.builder("nodeId");
+        builder.setVariableOrder(new VariableOrder("y;s;d"));
+        builder.setSparql("sparql string");
+        builder.setChildNodeId("childNodeId");
+        final QueryMetadata originalMetadata = builder.build();
+
+        // Write it to the Fluo table.
+        try(Transaction tx = fluoClient.newTransaction()) {
+            dao.write(tx, originalMetadata);
+            tx.commit();
+        }
+
+        // Read it from the Fluo table.
+        QueryMetadata storedMetdata = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+        }
+
+        // Ensure the deserialized object is the same as the serialized one.
+        assertEquals(originalMetadata, storedMetdata);
+    }
+
+    @Test
+    public void fluoQueryTest() throws MalformedQueryException {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final String sparql =
+                "SELECT ?customer ?worker ?city " +
+                "{ " +
+                  "FILTER(?customer = <http://Alice>) " +
+                  "FILTER(?city = <http://London>) " +
+                  "?customer <http://talksTo> ?worker. " +
+                  "?worker <http://livesIn> ?city. " +
+                  "?worker <http://worksAt> <http://Chipotle>. " +
+                "}";
+
+        final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+        final FluoQuery originalQuery = new 
SparqlFluoQueryBuilder().make(query, new NodeIds());
+
+        // Write it to the Fluo table.
+        try(Transaction tx = fluoClient.newTransaction()) {
+            dao.write(tx, originalQuery);
+            tx.commit();
+        }
+
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, 
originalQuery.getQueryMetadata().getNodeId());
+        }
+
+        // Ensure the deserialized object is the same as the serialized one.
+        assertEquals(originalQuery, storedQuery);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
new file mode 100644
index 0000000..41c2d7d
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+
+import com.google.common.collect.Sets;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Performs integration tests over the Fluo application geared towards various 
types of input.
+ * <p>
+ * These tests are being ignore so that they will not run as unit tests while 
building the application.
+ */
+public class InputIT extends ITBase {
+
+    /**
+     * Ensure historic matches are included in the result.
+     */
+    @Test
+    public void historicResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql =
+              "SELECT ?x " +
+                "WHERE { " +
+                "?x <http://talksTo> <http://Eve>. " +
+                "?x <http://worksAt> <http://Chipotle>." +
+              "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Bob";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+
+                makeStatement("http://Eve";, "http://helps";, "http://Kevin";),
+
+                makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
+                makeStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+
+        // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Bob";))));
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Charlie";))));
+
+        // Load the historic data into Rya.
+        for(final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+
+        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
+        assertEquals(expected, results);
+    }
+
+    /**
+     * Ensure streamed matches are included in the result.
+     */
+    @Test
+    public void streamedResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql =
+              "SELECT ?x " +
+                "WHERE { " +
+                "?x <http://talksTo> <http://Eve>. " +
+                "?x <http://worksAt> <http://Chipotle>." +
+              "}";
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
+                makeRyaStatement("http://Bob";, "http://talksTo";, "http://Eve";),
+                makeRyaStatement("http://Charlie";, "http://talksTo";, 
"http://Eve";),
+
+                makeRyaStatement("http://Eve";, "http://helps";, "http://Kevin";),
+
+                makeRyaStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";),
+                makeRyaStatement("http://Charlie";, "http://worksAt";, 
"http://Chipotle";),
+                makeRyaStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";),
+                makeRyaStatement("http://David";, "http://worksAt";, 
"http://Chipotle";));
+
+        // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Bob";))));
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Charlie";))));
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Ensure the query has no results yet.
+        fluo.waitForObservers();
+        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertTrue( results.isEmpty() );
+
+        // Stream the data into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+        results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+
+    /**
+     * Simulates the case where a Triple is added to Rya, a new query that 
includes
+     * that triple as a historic match is inserted into Fluo, and then some new
+     * triple that matches the query is streamed into Fluo. The query's results
+     * must include both the historic result and the newly streamed result.
+     */
+    @Test
+    public void historicThenStreamedResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql =
+              "SELECT ?x " +
+                "WHERE { " +
+                "?x <http://talksTo> <http://Eve>. " +
+                "?x <http://worksAt> <http://Chipotle>." +
+              "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+                makeRyaStatement("http://Frank";, "http://talksTo";, 
"http://Eve";),
+                makeRyaStatement("http://Frank";, "http://worksAt";, 
"http://Chipotle";));
+
+        // Load the historic data into Rya.
+        for(final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Ensure Alice is a match.
+        fluo.waitForObservers();
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Alice";))));
+
+        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+
+        // Stream the data into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples);
+
+        // Verify the end results of the query also include Frank.
+        fluo.waitForObservers();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Frank";))));
+
+        results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+
+    /**
+     * Simulates the case where a Triple is added to Rya, a new query that
+     * includes the triple as a historic match is inserted into Fluo, and then
+     * the same triple is streamed into Fluo. The query's results will already
+     * include the Triple because they were added while the query was being
+     * created. This case should not fail or effect the end results in any way.
+     */
+    @Test
+    public void historicAndStreamConflict() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql =
+              "SELECT ?x " +
+                "WHERE { " +
+                "?x <http://talksTo> <http://Eve>. " +
+                "?x <http://worksAt> <http://Chipotle>." +
+              "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice";, "http://talksTo";, "http://Eve";),
+                makeStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+                makeRyaStatement("http://Alice";, "http://talksTo";, 
"http://Eve";),
+                makeRyaStatement("http://Alice";, "http://worksAt";, 
"http://Chipotle";));
+
+        // The expected final result.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Alice";))));
+
+        // Load the historic data into Rya.
+        for(final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Ensure Alice is a match.
+        fluo.waitForObservers();
+        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+
+        // Stream the same Alice triple into Fluo.
+        new InsertTriples().insert(fluoClient, streamedTriples);
+
+        // Verify the end results of the query is stiill only Alice.
+        fluo.waitForObservers();
+        results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+}
\ No newline at end of file

Reply via email to