RYA-55 Implemented a series of projects that use Fluo to incrementally update 
the results of a Rya Precomputed Join secondary index.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/15ec5d5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/15ec5d5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/15ec5d5f

Branch: refs/heads/develop
Commit: 15ec5d5faa670f9a65de52caa907eebdfa5ae757
Parents: 358c13b
Author: Kevin Chilton <[email protected]>
Authored: Fri Mar 18 19:13:57 2016 -0400
Committer: Kevin Chilton <[email protected]>
Committed: Mon Mar 21 13:32:49 2016 -0400

----------------------------------------------------------------------
 extras/pom.xml                                  |   1 +
 extras/rya.pcj.fluo/README.md                   |  42 ++
 extras/rya.pcj.fluo/pcj.fluo.api/pom.xml        |  48 ++
 .../indexing/pcj/fluo/api/CountStatements.java  |  64 +++
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    | 249 ++++++++++
 .../indexing/pcj/fluo/api/GetPcjMetadata.java   | 132 +++++
 .../indexing/pcj/fluo/api/GetQueryReport.java   | 256 ++++++++++
 .../indexing/pcj/fluo/api/InsertTriples.java    | 102 ++++
 .../rya/indexing/pcj/fluo/api/ListQueryIds.java |  76 +++
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |  90 ++++
 .../indexing/pcj/fluo/app/BindingSetRow.java    |  87 ++++
 .../rya/indexing/pcj/fluo/app/FilterFinder.java |  83 ++++
 .../pcj/fluo/app/FilterResultUpdater.java       | 152 ++++++
 .../pcj/fluo/app/FluoStringConverter.java       | 299 ++++++++++++
 .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 232 +++++++++
 .../fluo/app/IncrementalUpdateConstants.java    |  37 ++
 .../pcj/fluo/app/JoinResultUpdater.java         | 314 ++++++++++++
 .../rya/indexing/pcj/fluo/app/NodeType.java     |  62 +++
 .../pcj/fluo/app/QueryResultUpdater.java        |  81 ++++
 .../indexing/pcj/fluo/app/StringTypeLayer.java  |  29 ++
 .../app/export/IncrementalResultExporter.java   |  70 +++
 .../IncrementalResultExporterFactory.java       | 103 ++++
 .../pcj/fluo/app/export/ParametersBase.java     |  59 +++
 .../app/export/rya/RyaExportParameters.java     | 133 +++++
 .../fluo/app/export/rya/RyaResultExporter.java  |  73 +++
 .../export/rya/RyaResultExporterFactory.java    |  69 +++
 .../fluo/app/observers/BindingSetUpdater.java   | 155 ++++++
 .../pcj/fluo/app/observers/FilterObserver.java  |  65 +++
 .../pcj/fluo/app/observers/JoinObserver.java    |  64 +++
 .../fluo/app/observers/QueryResultObserver.java | 111 +++++
 .../app/observers/StatementPatternObserver.java |  65 +++
 .../pcj/fluo/app/observers/TripleObserver.java  | 135 ++++++
 .../pcj/fluo/app/query/CommonNodeMetadata.java  | 102 ++++
 .../pcj/fluo/app/query/FilterMetadata.java      | 257 ++++++++++
 .../indexing/pcj/fluo/app/query/FluoQuery.java  | 318 ++++++++++++
 .../pcj/fluo/app/query/FluoQueryColumns.java    | 142 ++++++
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 358 ++++++++++++++
 .../pcj/fluo/app/query/JoinMetadata.java        | 224 +++++++++
 .../pcj/fluo/app/query/QueryMetadata.java       | 186 +++++++
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 479 +++++++++++++++++++
 .../app/query/StatementPatternMetadata.java     | 197 ++++++++
 .../indexing/pcj/fluo/app/FilterFinderTest.java |  84 ++++
 .../pcj/fluo/app/FluoStringConverterTest.java   | 203 ++++++++
 .../rya/indexing/pcj/fluo/app/NodeTypeTest.java |  64 +++
 .../app/export/rya/RyaExportParametersTest.java |  65 +++
 .../pcj.fluo.client/conf/log4j2.xml             |  36 ++
 .../pcj.fluo.client/conf/tool.properties        |  45 ++
 extras/rya.pcj.fluo/pcj.fluo.client/pom.xml     | 136 ++++++
 .../pcj/fluo/client/PcjAdminClient.java         | 247 ++++++++++
 .../pcj/fluo/client/PcjAdminClientCommand.java  |  97 ++++
 .../fluo/client/PcjAdminClientProperties.java   | 101 ++++
 .../CountUnprocessedStatementsCommand.java      |  69 +++
 .../fluo/client/command/ListQueriesCommand.java | 127 +++++
 .../fluo/client/command/LoadTriplesCommand.java | 140 ++++++
 .../fluo/client/command/NewQueryCommand.java    | 124 +++++
 .../fluo/client/command/QueryReportCommand.java | 104 ++++
 .../pcj/fluo/client/util/FluoLoader.java        |  90 ++++
 .../fluo/client/util/ParsedQueryRequest.java    | 134 ++++++
 .../fluo/client/util/PcjMetadataRenderer.java   | 100 ++++
 .../fluo/client/util/QueryReportRenderer.java   | 112 +++++
 .../indexing/pcj/fluo/client/util/Report.java   | 214 +++++++++
 .../pcj/fluo/client/ParsedQueryRequestTest.java |  83 ++++
 .../fluo/client/PcjMetadataRendererTest.java    | 125 +++++
 .../indexing/pcj/fluo/client/ReportTests.java   |  82 ++++
 extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml       |  85 ++++
 .../apache/rya/indexing/pcj/fluo/demo/Demo.java |  63 +++
 .../rya/indexing/pcj/fluo/demo/DemoDriver.java  | 289 +++++++++++
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  | 371 ++++++++++++++
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |  64 +++
 .../apache/rya/indexing/pcj/fluo/ITBase.java    | 374 +++++++++++++++
 .../pcj/fluo/api/CountStatementsIT.java         |  86 ++++
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 101 ++++
 .../indexing/pcj/fluo/api/GetQueryReportIT.java | 120 +++++
 .../indexing/pcj/fluo/api/ListQueryIdsIT.java   |  66 +++
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  | 183 +++++++
 .../indexing/pcj/fluo/integration/InputIT.java  | 254 ++++++++++
 .../indexing/pcj/fluo/integration/QueryIT.java  | 234 +++++++++
 .../pcj/fluo/integration/RyaExportIT.java       | 187 ++++++++
 extras/rya.pcj.fluo/pom.xml                     | 130 +++++
 79 files changed, 10990 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index a3199bf..d3d51a1 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -40,5 +40,6 @@ under the License.
         <module>indexing</module>
         <module>indexingExample</module>
         <module>vagrantExample</module>
+        <module>rya.pcj.fluo</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/README.md
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/README.md b/extras/rya.pcj.fluo/README.md
new file mode 100644
index 0000000..70361c1
--- /dev/null
+++ b/extras/rya.pcj.fluo/README.md
@@ -0,0 +1,42 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+Rya Incrementally Updating Precomputed Joins
+============================================
+This project is an implementation of the Rya Precomputed Join (PCJ) indexing 
+feature that runs on top of [Fluo][1] so that it may incrementally update the
+results of a query as new semantic triples are added to storage.  
+
+This project contains the following modules:
+  * **rya.pcj.fluo.app** - A Fluo application that incrementally updates the 
results
+    of a Precomputed Join Secondary Index. This app runs as a YARN application 
on a 
+    cluster, receives streams of new RDF Statements, determines if those 
statements
+    create any new index values, and then exports those values to the 
appropriate Rya 
+    PCJ Tables.
+  * **rya.pcj.fluo.api** - Defines calls that may be made to the Rya PCJ Fluo 
App
+    while it is running. These calls are intended to be used by client 
applications
+    such as debug tools, data ingest tools, administrative tools, etc. 
+  * **rya.pcj.fluo.client** - A command line client that lets an 
administrative user
+    interact with the running Rya PCJ Flup App that is running on their 
cluster.
+  * **rya.pcj.fluo.demo** - A demo application that shows how the Rya PCJ Fluo 
App
+    may be used to incrementally update PCJ results within a Rya instance. The 
demo 
+    uses MiniAccumuloCluster and MiniFluo so that it is entirely self 
contained.  
+  * **integration** - Contains integration tests that use a MiniAccumuloCluster
+    and MiniFluo to ensure the Rya PCJ Fluo App work within an emulation of the
+    production environment.
+
+[1]: http://fluo.io/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
new file mode 100644
index 0000000..292121d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project 
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.pcj.fluo.parent</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.pcj.fluo.api</artifactId>
+    
+    <name>Apache Rya PCJ Fluo API</name>
+    <description>
+        This module contains the Rya PCJ Fluo API. It consists of classes 
+        that allow other applications to interact with the Rya PCJ Fluo 
+        application while it is running on a cluster.
+    </description>
+    
+    <dependencies>
+       <!-- Rya Runtime Dependencies. --> 
+        <dependency> 
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.app</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java
new file mode 100644
index 0000000..326f807
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigInteger;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.iterator.RowIterator;
+
+/**
+ * Counts the number of RDF Statements that have been loaded into the Fluo app
+ * that have not been processed yet.
+ */
+public class CountStatements {
+
+    /**
+     * Get the number of RDF Statements that have been loaded into the Fluo app
+     * that have not been processed yet.
+     *
+     * @param fluo - The connection to Fluo that will be used to fetch the 
metadata. (not null)
+     * @return The number of RDF Statements that have been loaded into the Fluo
+     *   app that have not been processed yet.
+     */
+    public BigInteger countStatements(final FluoClient fluo) {
+        checkNotNull(fluo);
+
+        try(Snapshot sx = fluo.newSnapshot()) {
+            // Limit the scan to the Triples binding set column.
+            final ScannerConfiguration scanConfig = new ScannerConfiguration();
+            scanConfig.fetchColumn(FluoQueryColumns.TRIPLES.getFamily(), 
FluoQueryColumns.TRIPLES.getQualifier());
+
+            final RowIterator rows = sx.get(scanConfig);
+            BigInteger count = BigInteger.valueOf(0L);
+            while(rows.hasNext()) {
+                rows.next();
+                count = count.add( BigInteger.ONE );
+            }
+
+            return count;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
new file mode 100644
index 0000000..b99d293
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import info.aduna.iteration.CloseableIteration;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.types.TypedTransaction;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory;
+import mvm.rya.indexing.external.tupleSet.PcjTables.ShiftVarOrderFactory;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
+ * <p>
+ * This is a two phase process.
+ * <ol>
+ *   <li>Setup metadata about each node of the query using a single Fluo 
transaction. </li>
+ *   <li>Scan Rya for binding sets that match each Statement Pattern from the 
query
+ *       and use a separate Fluo transaction for each batch that is inserted. 
This
+ *       ensure historic triples will be included in the query's results.</li>
+ * </ol>
+ * After the first step is finished, any new Triples that are added to the Fluo
+ * application will be matched against statement patterns, the final results
+ * will percolate to the top of the query, and those results will be exported 
to
+ * Rya's query system.
+ */
+@ParametersAreNonnullByDefault
+public class CreatePcj {
+
+    /**
+     * Wraps Fluo {@link Transaction}s so that we can write String values to 
them.
+     */
+    private static final StringTypeLayer STRING_TYPED_LAYER = new 
StringTypeLayer();
+
+    /**
+     * The default Statement Pattern batch insert size is 1000.
+     */
+    private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
+
+    /**
+     * A utility used to interact with Rya's PCJ tables.
+     */
+    private static final PcjTables PCJ_TABLES = new PcjTables();
+
+    /**
+     * The maximum number of binding sets that will be inserted into each 
Statement
+     * Pattern's result set per Fluo transaction.
+     */
+    private final int spInsertBatchSize;
+
+    /**
+     * Constructs an instance of {@link CreatePcj} that uses
+     * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
+     */
+    public CreatePcj() {
+        this(DEFAULT_SP_INSERT_BATCH_SIZE);
+    }
+
+    /**
+     * Constructs an instance of {@link CreatePcj}.
+     *
+     * @param spInsertBatchSize - The maximum number of binding sets that will 
be
+     *   inserted into each Statement Pattern's result set per Fluo 
transaction.
+     */
+    public CreatePcj(final int spInsertBatchSize) {
+        checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + 
spInsertBatchSize + "' must be greater than 0.");
+        this.spInsertBatchSize = spInsertBatchSize;
+    }
+
+    /**
+     * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. 
Historic
+     * triples will be scanned and matched using the rya connection that was
+     * provided. The PCJ will also automatically export to a table in Accumulo
+     * named using the {@code ryaTablePrefix} and the query's ID from the Fluo 
table.
+     *
+     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
+     * @param ryaTablePrefix - The prefix that will be prepended to the 
Accumulo table
+     *   the PCJ's results will be exported to. (not null)
+     * @param rya - A connection to the Rya repository that will be scanned. 
(not null)
+     * @param accumuloConn - A connectino to the Accumulo instance the 
incremental
+     *   results will be exported to as a Rya PCJ table. (not null)
+     * @param varOrders - The variable orders the query's results will be 
exported to
+     *   within the export table. If this set is empty, then a default will be
+     *   used instead.(not null)
+     * @param sparql - The SPARQL query whose results will be incrementally 
updated by Fluo. (not null)
+     * @throws MalformedQueryException The PCJ could not be initialized 
because the SPARQL query was malformed.
+     * @throws PcjException The PCJ could not be initialized because of a 
problem setting up the export location.
+     * @throws SailException Historic results could not be added to the 
initialized PCJ because of
+     *   a problem with the Rya connection.
+     * @throws QueryEvaluationException Historic results could not be added to 
the initialized PCJ because of
+     *   a problem with the Rya connection.
+     */
+    public void withRyaIntegration(
+            final FluoClient fluo,
+            final String ryaTablePrefix,
+            final RyaSailRepository rya,
+            final Connector accumuloConn,
+            final Set<VariableOrder> varOrders,
+            final String sparql) throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException {
+        checkNotNull(fluo);
+        checkNotNull(ryaTablePrefix);
+        checkNotNull(rya);
+        checkNotNull(accumuloConn);
+        checkNotNull(varOrders);
+        checkNotNull(sparql);
+
+        // Parse the SPARQL into a POJO.
+        final SPARQLParser parser = new SPARQLParser();
+        final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+
+        // Keeps track of the IDs that are assigned to each of the query's 
nodes in Fluo.
+        // We use these IDs later when scanning Rya for historic Statement 
Pattern matches
+        // as well as setting up automatic exports.
+        final NodeIds nodeIds = new NodeIds();
+        final String exportTableName;
+        final String queryId;
+
+        // Parse the query's structure for the metadata that will be written 
to fluo.
+        final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+
+        try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( 
fluo.newTransaction() )) {
+            // Write the query's structure to Fluo.
+            new FluoQueryMetadataDAO().write(tx, fluoQuery);
+
+            // Since we are exporting the query's results to a table in 
Accumulo, store that location in the fluo table.
+            queryId = fluoQuery.getQueryMetadata().getNodeId();
+
+            exportTableName = new 
PcjTableNameFactory().makeTableName(ryaTablePrefix, queryId);
+            
tx.mutate().row(queryId).col(FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).set(exportTableName);
+
+            // Flush the changes to Fluo.
+            tx.commit();
+        }
+
+        // Initialize the export destination in Accumulo. If triples are being 
written to Fluo
+        // while this query is being created, then the export observer may 
throw errors for a while
+        // until this step is completed.
+        final VariableOrder queryVarOrder = 
fluoQuery.getQueryMetadata().getVariableOrder();
+        if(varOrders.isEmpty()) {
+            final Set<VariableOrder> shiftVarOrders = new 
ShiftVarOrderFactory().makeVarOrders( queryVarOrder );
+            varOrders.addAll(shiftVarOrders);
+        }
+        PCJ_TABLES.createPcjTable(accumuloConn, exportTableName, varOrders, 
sparql);
+
+        // Get a connection to Rya. It's used to scan for Statement Pattern 
results.
+        final SailConnection ryaConn = rya.getSail().getConnection();
+
+        // Reuse the same set object while performing batch inserts.
+        final Set<BindingSet> batch = new HashSet<>();
+
+        // Iterate through each of the statement patterns and insert their 
historic matches into Fluo.
+        for(final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
+            // Get an iterator over all of the binding sets that match the 
statement pattern.
+            final StatementPattern pattern = 
FluoStringConverter.toStatementPattern( patternMetadata.getStatementPattern() );
+            final CloseableIteration<? extends BindingSet, 
QueryEvaluationException> bindingSets = ryaConn.evaluate(pattern, null, null, 
false);
+
+            // Insert batches of the binding sets into Fluo.
+            while(bindingSets.hasNext()) {
+                if(batch.size() == spInsertBatchSize) {
+                    writeBatch(fluo, patternMetadata, batch);
+                    batch.clear();
+                }
+
+                batch.add( bindingSets.next() );
+            }
+
+            if(!batch.isEmpty()) {
+                writeBatch(fluo, patternMetadata, batch);
+                batch.clear();
+            }
+        }
+    }
+
+    /**
+     * Writes a batch of {@link BindingSet}s that match a statement pattern to 
Fluo.
+     *
+     * @param fluo - Creates transactions to Fluo. (not null)
+     * @param spMetadata - The Statement Pattern the batch matches. (not null)
+     * @param batch - A set of binding sets that are the result of the 
statement pattern. (not null)
+     */
+    private static void writeBatch(final FluoClient fluo, final 
StatementPatternMetadata spMetadata, final Set<BindingSet> batch) {
+        checkNotNull(fluo);
+        checkNotNull(spMetadata);
+        checkNotNull(batch);
+
+        try(TypedTransaction tx = 
STRING_TYPED_LAYER.wrap(fluo.newTransaction())) {
+            // Get the node's variable order.
+            final String spNodeId = spMetadata.getNodeId();
+            final String[] varOrder = spMetadata.getVariableOrder().toArray();
+
+            for(final BindingSet bindingSet : batch) {
+                final String bindingSetStr = 
FluoStringConverter.toBindingSetString(bindingSet, varOrder);
+
+                // Write the binding set entry to Fluo for the statement 
pattern.
+                tx.mutate().row(spNodeId + NODEID_BS_DELIM + bindingSetStr)
+                    .col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET)
+                    .set(bindingSetStr);
+            }
+
+            tx.commit();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
new file mode 100644
index 0000000..88c7930
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.types.TypedSnapshot;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+
+/**
+ * Get {@link PcjMetadata} for queries that are managed by the Fluo app.
+ */
+public class GetPcjMetadata {
+
+    private final ListQueryIds listQueryIds = new ListQueryIds();
+
+    /**
+     * Get the {@link PcjMetadata} of all queries that are being maintained by
+     * the Fluo app.
+     *
+     * @param accumulo - The Accumulo instance that will be searched. (not 
null)
+     * @param fluo - The Fluo instance that will be searched. (not null)
+     * @return A map where the query ID is the key and its metadata is the 
value.
+     * @throws NotInFluoException A query Id does not have a PCJ export able
+     *   associated with it in the Fluo table.
+     * @throws NotInAccumuloException A PCJ export table that was found either
+     *   does not exist in Accumulo or it is not a PCJ table.
+     */
+    public Map<String, PcjMetadata> getMetadata(final Connector accumulo, 
final FluoClient fluo) throws NotInFluoException, NotInAccumuloException {
+        checkNotNull(accumulo);
+        checkNotNull(fluo);
+
+        final Map<String, PcjMetadata> metadata = new HashMap<>();
+
+        final Collection<String> queryIds = listQueryIds.listQueryIds(fluo);
+        for(final String queryId : queryIds) {
+            metadata.put(queryId, getMetadata(accumulo, fluo, queryId));
+        }
+
+        return metadata;
+    }
+
+    /**
+     * Get the {@link PcjMetadata} of a query that is being maintained by the
+     * Fluo app.
+     *
+     * @param accumulo - The Accumulo instance that will be searched. (not 
null)
+     * @param fluo - The Fluo instance that will be searched. (not null)
+     * @param queryId - The Query Id whose metadata will be fetched. (not null)
+     * @return The {@link PcjMetadata} of the query.
+     * @throws NotInFluoException The query Id does not have a PCJ export able
+     *   associated with it in the Fluo table.
+     * @throws NotInAccumuloException The PCJ export table that was found 
either
+     *   does not exist in Accumulo or it is not a PCJ table.
+     */
+    public PcjMetadata getMetadata(final Connector accumulo, final FluoClient 
fluo, final String queryId) throws NotInFluoException, NotInAccumuloException {
+        checkNotNull(accumulo);
+        checkNotNull(fluo);
+        checkNotNull(queryId);
+
+        // Lookup the Accumulo export table name in the Fluo table.
+        String pcjTableName = null;
+        try(TypedSnapshot snap = new StringTypeLayer().wrap( 
fluo.newSnapshot() ) ) {
+            final Bytes pcjTableNameBytes = snap.get(Bytes.of(queryId), 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME);
+            if(pcjTableNameBytes == null) {
+                throw new NotInFluoException("Could not get the PcjMetadata 
for queryId '" + queryId +
+                        "' because a PCJ export table name was not stored in 
the Fluo table.");
+            }
+            pcjTableName = pcjTableNameBytes.toString();
+        }
+
+        // Fetch the metadata from the Accumulo table.
+        try {
+            return new PcjTables().getPcjMetadata(accumulo, pcjTableName);
+        } catch (final PcjException e) {
+            throw new NotInAccumuloException("Could not get the PcjMetadata 
for queryId '" + queryId +
+                    "' because the metadata was missing from the Accumulo 
table.", e);
+        }
+    }
+
+    /**
+     * Indicates PCJ Metadata could not be fetched for a query ID because the
+     * Accumulo export table name was not stored in the Fluo table.
+     */
+    public static final class NotInFluoException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public NotInFluoException(final String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Indicates PCJ Metadata could not be fetched for a query ID because the
+     * metadata was missing in Accumulo.
+     */
+    public static final class NotInAccumuloException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public NotInAccumuloException(final String message, final Exception 
cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
new file mode 100644
index 0000000..2db7f3d
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.client.SnapshotBase;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.Span;
+import io.fluo.api.iterator.RowIterator;
+
+/**
+ * Get a reports that indicates how many binding sets have been emitted for
+ * the queries that is being managed by the fluo application.
+ */
+@ParametersAreNonnullByDefault
+public class GetQueryReport {
+
+    private final FluoQueryMetadataDAO metadataDao = new 
FluoQueryMetadataDAO();
+
+    /**
+     * Get a report that indicates how many binding sets have been emitted for
+     * every query that is being managed by the fluo application.
+     *
+     * @param fluo - The connection to Fluo that will be used to fetch the 
metadata. (not null)
+     * @return A map from Query ID to QueryReport that holds a report for all 
of
+     *   the queries that are being managed within the fluo app.
+     */
+    public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) {
+        checkNotNull(fluo);
+
+        // Fetch the queries that are being managed by the Fluo.
+        final List<String> queryIds = new ListQueryIds().listQueryIds(fluo);
+
+        final Map<String, QueryReport> reports = new HashMap<>();
+        for(final String queryId : queryIds) {
+            final QueryReport report = getReport(fluo, queryId);
+            reports.put(queryId, report);
+        }
+        return reports;
+    }
+
+    /**
+     * Get a report that indicates how many biniding sets have been emitted for
+     * a query that is being managed by the fluo application.
+     *
+     * @param fluo - The connection to Fluo that will be used to fetch the 
metadata. (not null)
+     * @param queryId - The ID of the query to fetch. (not null)
+     * @return A report that was built for the query.
+     */
+    public QueryReport getReport(final FluoClient fluo, final String queryId) {
+        checkNotNull(fluo);
+        checkNotNull(queryId);
+
+        final QueryReport.Builder reportBuilder = QueryReport.builder();
+
+        try(Snapshot sx = fluo.newSnapshot()) {
+            final FluoQuery fluoQuery = metadataDao.readFluoQuery(sx, queryId);
+            reportBuilder.setFluoQuery(fluoQuery);
+
+            // Query results.
+            BigInteger count = countBindingSets(sx, queryId, 
FluoQueryColumns.QUERY_BINDING_SET);
+            reportBuilder.setCount(queryId, count);
+
+            // Filter results.
+            for(final FilterMetadata filter : fluoQuery.getFilterMetadata()) {
+                final String filterId = filter.getNodeId();
+                count = countBindingSets(sx, filterId, 
FluoQueryColumns.FILTER_BINDING_SET);
+                reportBuilder.setCount(filterId, count);
+            }
+
+            // Join results.
+            for(final JoinMetadata join : fluoQuery.getJoinMetadata()) {
+                final String joinId = join.getNodeId();
+                count = countBindingSets(sx, joinId, 
FluoQueryColumns.JOIN_BINDING_SET);
+                reportBuilder.setCount(joinId, count);
+            }
+
+            // Statement Pattern results.
+            for(final StatementPatternMetadata statementPattern : 
fluoQuery.getStatementPatternMetadata()) {
+                final String patternId = statementPattern.getNodeId();
+                count = countBindingSets(sx, patternId, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET);
+                reportBuilder.setCount(patternId, count);
+            }
+        }
+
+        return reportBuilder.build();
+    }
+
+    private BigInteger countBindingSets(final SnapshotBase sx, final String 
nodeId, final Column bindingSetColumn) {
+        checkNotNull(sx);
+        checkNotNull(nodeId);
+        checkNotNull(bindingSetColumn);
+
+        // Limit the scan to the binding set column and node id.
+        final ScannerConfiguration scanConfig = new ScannerConfiguration();
+        scanConfig.fetchColumn(bindingSetColumn.getFamily(), 
bindingSetColumn.getQualifier());
+        scanConfig.setSpan( Span.prefix(nodeId) );
+
+        final RowIterator rows = sx.get(scanConfig);
+        BigInteger count = BigInteger.valueOf(0L);
+        while(rows.hasNext()) {
+            rows.next();
+            count = count.add( BigInteger.ONE );
+        }
+
+        return count;
+    }
+
+    /**
+     * Contains all metadata that represents a SPARQL query within the Fluo app
+     * as well as the number of Binding Sets that have been emitted for each of
+     * the query nodes.
+     */
+    @Immutable
+    @ParametersAreNonnullByDefault
+    public static final class QueryReport {
+
+        /**
+         * Metadata about the nodes of the query.
+         */
+        private final FluoQuery fluoQuery;
+
+        /**
+         * The number of binding sets that match each of the nodes.
+         * <p>
+         * The key is the Node ID of a node in {@code fluoQuery}. <br/>
+         * The value is the number of Binding Sets that have been emitted for 
the node.
+         */
+        private final ImmutableMap<String, BigInteger> counts;
+
+        /**
+         * Constructs an instance of {@link QueryReport}. Use the {@link 
Builder} instead.
+         *
+         * @param fluoQuery - Metadata about the nodes of the query. (not null)
+         * @param counts - A map from Node ID to the number of binding sets 
that
+         *   have been emitted for that Node ID in the fluo app. (not null)
+         */
+        private QueryReport(
+                final FluoQuery fluoQuery,
+                final ImmutableMap<String, BigInteger> counts) {
+            this.fluoQuery = checkNotNull(fluoQuery);
+            this.counts = checkNotNull(counts);
+        }
+
+        /**
+         * @return Metadata about the nodes of the query.
+         */
+        public FluoQuery getFluoQuery() {
+            return fluoQuery;
+        }
+
+        /**
+         * Get the number of Binding Sets that have been emitted for a node.
+         *
+         * @param nodeId - The Node ID of the node that emits binding sets. 
(not null)
+         * @return The number of Binding Sets that have been emitted for the 
node.
+         */
+        public BigInteger getCount(final String nodeId) {
+            checkNotNull(nodeId);
+            return counts.get(nodeId);
+        }
+
+        /**
+         * @return A map from Node ID to the number of binding sets that
+         *   have been emitted for that Node ID in the fluo app.
+         */
+        public ImmutableMap<String, BigInteger> getCounts() {
+            return counts;
+        }
+
+        /**
+         * @return An empty instance of {@link Builder}.
+         */
+        public static Builder builder() {
+            return new Builder();
+        }
+
+        /**
+         * Builds instances of {@link QueryReport}.
+         */
+        @ParametersAreNonnullByDefault
+        public static final class Builder {
+
+            private FluoQuery fluoQuery = null;
+            private final ImmutableMap.Builder<String, BigInteger> counts = 
ImmutableMap.builder();
+
+            /**
+             * Set the metadata about the nodes of the query.
+             *
+             * @param fluoQuery - The metadata about the nodes of the query.
+             * @return This builder so that method invocations may be chained.
+             */
+            public Builder setFluoQuery(@Nullable final FluoQuery fluoQuery) {
+                this.fluoQuery = fluoQuery;
+                return this;
+            }
+
+            /**
+             * Set the number of Binding Sets that have been emitted for a 
node.
+             *
+             * @param nodeId - The ID of the node.
+             * @param count - the number of binding sets that have been 
emitted.
+             * @return This builder so that method invocations may be chained.
+             */
+            public Builder setCount(@Nullable final String nodeId, @Nullable 
final BigInteger count) {
+                counts.put(nodeId, count);
+                return this;
+            }
+
+            /**
+             * @return An instance of {@link QueryReport} built using this 
builder's values.
+             */
+            public QueryReport build() {
+                return new QueryReport(fluoQuery, counts.build());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
new file mode 100644
index 0000000..02e871f
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.types.TypedTransaction;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+
+/**
+ * Insert a batch of Triples into. This will trigger observers that will update
+ * the final results of any PCJs that are being managed by this application.
+ */
+public class InsertTriples {
+    private static final Logger log = Logger.getLogger(InsertTriples.class);
+
+    /**
+     * Wraps Fluo {@link Transaction}s so that we can write String values to 
them.
+     */
+    private static final StringTypeLayer STRING_TYPED_LAYER = new 
StringTypeLayer();
+
+    /**
+     * Converts triples into the byte[] used as the row ID in Accumulo.
+     */
+    private static final WholeRowTripleResolver TRIPLE_RESOLVER = new 
WholeRowTripleResolver();
+
+    /**
+     * Inserts a triple into Fluo.
+     *
+     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
+     * @param triple - The triple to insert. (not null)
+     */
+    public void insert(final FluoClient fluo, final RyaStatement triple) {
+        insert(fluo, Collections.singleton(triple));
+    }
+
+    /**
+     * Insert a batch of triples into Fluo.
+     *
+     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
+     * @param triples - The triples to insert. (not null)
+     */
+    public void insert(final FluoClient fluo, final Collection<RyaStatement> 
triples) {
+        checkNotNull(fluo);
+        checkNotNull(triples);
+
+        try(TypedTransaction tx = 
STRING_TYPED_LAYER.wrap(fluo.newTransaction())) {
+            for(final RyaStatement triple : triples) {
+                try {
+                    
tx.mutate().row(spoFormat(triple)).col(FluoQueryColumns.TRIPLES).set();
+                } catch (final TripleRowResolverException e) {
+                    log.error("Could not convert a Triple into the SPO format: 
" + triple);
+                }
+            }
+
+            tx.commit();
+        }
+    }
+
+    /**
+     * Converts a triple into a byte[] holding the Rya SPO representation of 
it.
+     *
+     * @param triple - The triple to convert. (not null)
+     * @return The Rya SPO representation of the triple.
+     * @throws TripleRowResolverException The triple could not be converted.
+     */
+    public static byte[] spoFormat(final RyaStatement triple) throws 
TripleRowResolverException {
+        checkNotNull(triple);
+        final Map<TABLE_LAYOUT, TripleRow> serialized = 
TRIPLE_RESOLVER.serialize(triple);
+        final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+        return spoRow.getRow();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
new file mode 100644
index 0000000..a85bf56
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.types.TypedSnapshot;
+
+/**
+ * Finds all queries that are being managed by this instance of Fluo that
+ * are also being exported to the provided instance of Accumulo.
+ */
+public class ListQueryIds {
+
+    /**
+     * Finds all queries that are being managed by this instance of Fluo that
+     * are also being exported to the provided instance of Accumulo.
+     *
+     * @param fluo - The Fluo instance that will be searched. (not null)
+     * @return An ascending alphabetically sorted list of the Query IDs being
+     *   managed by the Fluo app and exported to an instance of Accumulo.
+     */
+    public List<String> listQueryIds(final FluoClient fluo) {
+        checkNotNull(fluo);
+
+        final List<String> queryIds = new ArrayList<>();
+
+        try(TypedSnapshot snap = new StringTypeLayer().wrap( 
fluo.newSnapshot() )) {
+            // Create an iterator that iterates over the QUERY_ID column.
+            final ScannerConfiguration scanConfig = new ScannerConfiguration();
+            scanConfig.fetchColumn(FluoQueryColumns.QUERY_ID.getFamily(), 
FluoQueryColumns.QUERY_ID.getQualifier());
+            final RowIterator rows = snap.get(scanConfig);
+
+            // Fetch the Query IDs that is stored in the Fluo table.
+            while(rows.hasNext()) {
+                final Entry<Bytes, ColumnIterator> entry = rows.next();
+                final Bytes sparql = entry.getKey();
+                final String queryId = snap.get(sparql, 
FluoQueryColumns.QUERY_ID).toString();
+                queryIds.add(queryId);
+            }
+        }
+
+        // Sort them alphabetically.
+        Collections.sort(queryIds);
+        return queryIds;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
new file mode 100644
index 0000000..b591e07
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project 
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.pcj.fluo.parent</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.pcj.fluo.app</artifactId>
+    
+    <name>Apache Rya PCJ Fluo App</name>
+    <description>
+        A Fluo implementation of Rya Precomputed Join Indexing. This module 
produces
+        a jar that may be executed by the 'fluo' command line tool as a YARN 
job.
+    </description>
+
+    <dependencies>
+        <!-- Rya Runtime Dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    
+        <!-- 3rd Party Runtime Dependencies. -->
+        <dependency>
+            <groupId>io.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Use the pre-build 'jar-with-dependencies' assembly to package 
the dependent class files into the final jar. 
+                 This creates a jar file that can be deployed to Fluo without 
having to include any dependent jars. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
new file mode 100644
index 0000000..859cd4b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import io.fluo.api.data.Bytes;
+
+/**
+ * The values of an Accumulo Row ID for a row that stores a Binding set for
+ * a specific Node ID of a query.
+ */
+@Immutable
+@ParametersAreNonnullByDefault
+public class BindingSetRow {
+    private final String nodeId;
+    private final String[] bindingStrings;
+
+    /**
+     * Constructs an instance of {@link BindingSetRow}.
+     *
+     * @param nodeId - The Node ID of a query node. (not null)
+     * @param bindingStrings - A Binding Set that is part of the node's 
results. (not null)
+     */
+    public BindingSetRow(final String nodeId, final String[] bindingStrings) {
+        this.nodeId = checkNotNull(nodeId);
+        this.bindingStrings = checkNotNull(bindingStrings);
+    }
+
+    /**
+     * @return The Node ID of a query node.
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return A Binding Set that is part of the node's results. It is 
formatted
+     *   in SPO order and each String requires further interpretation.
+     */
+    public String[] getBindingStrings() {
+        return bindingStrings;
+    }
+
+    /**
+     * Parses the {@link Bytes} of an Accumulo Row ID into a {@link 
BindingSetRow}.
+     *
+     * @param row - The Row ID to parse. (not null).
+     * @return A {@link BindingSetRow} holding the parsed values.
+     */
+    public static BindingSetRow make(final Bytes row) {
+        checkNotNull(row);
+
+        // Read the Node ID from the row's bytes.
+        final String[] rowArray = row.toString().split(NODEID_BS_DELIM);
+        if(rowArray.length != 2) {
+            throw new IllegalArgumentException("A row must contain a single 
NODEID_BS_DELIM.");
+        }
+        final String nodeId = rowArray[0];
+
+        // Read the row's Binding Set from the bytes.
+        final String[] bindingStrings = rowArray[1].split(DELIM);
+
+        return new BindingSetRow(nodeId, bindingStrings);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
new file mode 100644
index 0000000..3c9b875
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Optional;
+
+/**
+ * Searches a SPARQL query for {@link Filter}s.
+ */
+@ParametersAreNonnullByDefault
+class FilterFinder {
+
+    /**
+     * Search a SPARQL query for the {@link Filter} that appears at the
+     * {@code indexWithinQuery}'th time within the query.
+     * <p>
+     * The top most filter within the query will be at index 0, the next filter
+     * encountered will be at index 1, ... and the last index that is 
encountered
+     * will be at index <i>n</i>.
+     *
+     * @param sparql - The SPARQL query that to parse. (not null)
+     * @param indexWithinQuery - The index of the filter to fetch. (not null)
+     * @return The filter that was found within the query at the specified 
index;
+     *   otherwise absent.
+     * @throws Exception Thrown when the query could not be parsed or iterated 
over.
+     */
+    public Optional<Filter> findFilter(final String sparql, final int 
indexWithinQuery) throws Exception {
+        checkNotNull(sparql);
+        checkArgument(indexWithinQuery >= 0);
+
+        // When a filter is encountered for the requested index, store it in 
atomic reference and quit searching.
+        final AtomicReference<Filter> filterRef = new AtomicReference<>();
+        final QueryModelVisitorBase<RuntimeException> filterFinder = new 
QueryModelVisitorBase<RuntimeException>() {
+            private int i = 0;
+            @Override
+            public void meet(final Filter filter) {
+                // Store and stop searching.
+                if(i == indexWithinQuery) {
+                    filterRef.set(filter);
+                    return;
+                }
+
+                // Continue to the next filter.
+                i++;
+                super.meet(filter);
+            }
+        };
+
+        // Parse the query and find the filter.
+        final SPARQLParser parser = new SPARQLParser();
+        final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+        parsedQuery.getTupleExpr().visit(filterFinder);
+        return Optional.fromNullable(filterRef.get());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
new file mode 100644
index 0000000..5ff5acc
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.evaluation.TripleSource;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
+import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
+import io.fluo.api.client.TransactionBase;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Updates the results of a Filter node when its child has added a new Binding
+ * Set to its results.
+ */
+@ParametersAreNonnullByDefault
+public class FilterResultUpdater {
+
+    private final Encoder encoder = new StringEncoder();
+
+    /**
+     * A utility class used to search SPARQL queries for Filters.
+     */
+    private static final FilterFinder filterFinder = new FilterFinder();
+
+    /**
+     * Is used to evaluate the conditions of a {@link Filter}.
+     */
+    private static final EvaluationStrategyImpl evaluator = new 
EvaluationStrategyImpl(
+            new TripleSource() {
+                private final ValueFactory valueFactory = new 
ValueFactoryImpl();
+
+                @Override
+                public ValueFactory getValueFactory() {
+                    return valueFactory;
+                }
+
+                @Override
+                public CloseableIteration<? extends Statement, 
QueryEvaluationException> getStatements(
+                        final Resource arg0,
+                        final URI arg1,
+                        final Value arg2,
+                        final Resource... arg3) throws 
QueryEvaluationException {
+                    throw new UnsupportedOperationException();
+                }
+            });
+
+    /**
+     * Updates the results of a Filter node when one of its child has added a
+     * new Binding Set to its results.
+     *
+     * @param tx - The transaction all Fluo queries will use. (not null)
+     * @param childBindingSet - A binding set that the query's child node has 
emmitted. (not null)
+     * @param filterMetadata - The metadata of the Filter whose results will 
be updated. (not null)
+     * @throws Exception Something caused the update to fail.
+     */
+    public void updateFilterResults(
+            final TransactionBase tx,
+            final BindingSet childBindingSet,
+            final FilterMetadata filterMetadata) throws Exception {
+        checkNotNull(tx);
+        checkNotNull(childBindingSet);
+        checkNotNull(filterMetadata);
+
+        // Parse the original query and find the Filter that represents 
filterId.
+        final String sparql = filterMetadata.getOriginalSparql();
+        final int indexWithinQuery = 
filterMetadata.getFilterIndexWithinSparql();
+        final Optional<Filter> filter = filterFinder.findFilter(sparql, 
indexWithinQuery);
+
+        // Evaluate whether the child BindingSet satisfies the filter's 
condition.
+        final ValueExpr condition = filter.get().getCondition();
+        if (isTrue(condition, childBindingSet)) {
+            // Create the Filter's binding set from the child's.
+            final VariableOrder filterVarOrder = 
filterMetadata.getVariableOrder();
+
+            final MapBindingSet filterBindingSet = new MapBindingSet();
+            for(final String bindingName : filterVarOrder) {
+                final Binding binding = 
childBindingSet.getBinding(bindingName);
+                filterBindingSet.addBinding(binding);
+            }
+            final String filterBindingSetString = 
BindingSetStringConverter.toString(filterBindingSet, filterVarOrder);
+
+            final Bytes row = encoder.encode( filterMetadata.getNodeId() + 
NODEID_BS_DELIM + filterBindingSetString );
+            final Column col = FluoQueryColumns.FILTER_BINDING_SET;
+            final Bytes value = encoder.encode(filterBindingSetString);
+            tx.set(row, col, value);
+        }
+    }
+
+    /**
+     * Evaluate a {@link BindingSet} to see if it is accepted by a filter's 
condition.
+     *
+     * @param condition - The filter condition. (not null)
+     * @param bindings - The binding set to evaluate. (not null)
+     * @return {@code true} if the binding set is accepted by the filter; 
otherwise {@code false}.
+     * @throws QueryEvaluationException The condition couldn't be evaluated.
+     */
+    private static boolean isTrue(final ValueExpr condition, final BindingSet 
bindings) throws QueryEvaluationException {
+        try {
+            final Value value = evaluator.evaluate(condition, bindings);
+            return QueryEvaluationUtil.getEffectiveBooleanValue(value);
+        } catch (final ValueExprEvaluationException e) {
+            // XXX Hack: If filtering a statement that does not have the right 
bindings, return true.
+            //           When would this ever come up? Should we actually 
return true?
+            return true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
new file mode 100644
index 0000000..61e3d5f
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+
+import java.util.Collection;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.model.Literal;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+
+/**
+ * Contains method that convert between the Sesame representations of RDF
+ * components and the Strings that are used by the Fluo PCJ application.
+ */
+@ParametersAreNonnullByDefault
+public class FluoStringConverter {
+
+    private static final ValueFactory valueFactory = new ValueFactoryImpl();
+
+    /**
+     * Converts an ordered collection of variables into the Variable Order
+     * String that is stored in the {@link 
IncrementalUpdateConstants#NODE_VARS}
+     * column of the Fluo application.
+     *
+     * @param varOrder - An ordered collection of variables. (not null)
+     * @return The string representation of the variable order.
+     */
+    public static String toVarOrderString(final Collection<String> varOrder) {
+        checkNotNull(varOrder);
+        return Joiner.on(VAR_DELIM).join(varOrder);
+    }
+
+    /**
+     * Converts an ordered array of variables into the Variable Order
+     * String that is stored in the {@link 
IncrementalUpdateConstants#NODE_VARS}
+     * column of the Fluo application.
+     *
+     * @param varOrder - An ordered array of variables. (not null)
+     * @return The string representation of the variable order.
+     */
+    public static String toVarOrderString(final String... varOrder) {
+        return Joiner.on(VAR_DELIM).join(varOrder);
+    }
+
+    /**
+     * Converts a String into an array holding the Variable Order of a Binding 
Set.
+     *
+     * @param varOrderString - The string representation of the variable 
order. (not null)
+     * @return An ordered array holding the variable order of a binding set.
+     */
+    public static String[] toVarOrder(final String varOrderString) {
+        checkNotNull(varOrderString);
+        return varOrderString.split(VAR_DELIM);
+    }
+
+    /**
+     * Converts a {@link BindingSet} to the String representation that the Fluo
+     * application serializes to the Binding Set columns.
+     *
+     * @param bindingSet - The binding set values. (not null)
+     * @param varOrder - The order the variables must appear in. (not null)
+     * @return A {@code String} version of {@code bindingSet} suitable for
+     *   serialization to one of the Fluo application's binding set columns.
+     */
+    public static String toBindingSetString(final BindingSet bindingSet, final 
String[] varOrder) {
+        checkNotNull(bindingSet);
+        checkNotNull(varOrder);
+
+        final StringBuilder bindingSetString = new StringBuilder();
+
+        for(int i = 0; i < varOrder.length; i++) {
+            // Add a value to the binding set.
+            final String varName = varOrder[i];
+            final Value value = bindingSet.getBinding(varName).getValue();
+            final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
+            bindingSetString.append( ryaValue.getData() 
).append(TYPE_DELIM).append( ryaValue.getDataType() );
+
+            // If there are more values to add, include a delimiter between 
them.
+            if(i != varOrder.length-1) {
+                bindingSetString.append(DELIM);
+            }
+        }
+
+        return bindingSetString.toString();
+    }
+
+    /**
+     * Converts the String representation of a {@link BindingSet} as is created
+     * by {@link #toBindingSetString(BindingSet, String[])} back into a
+     * BindingSet.
+     *
+     * @param bindingSetString - The binding set values as a String. (not null)
+     * @param varOrder - The order the variables appear in the String version 
of
+     *   the BindingSet. (not null)
+     * @return A {@link BindingSet} representation of the String.
+     */
+    public static BindingSet toBindingSet(final String bindingSetString, final 
String[] varOrder) {
+        checkNotNull(bindingSetString);
+        checkNotNull(varOrder);
+
+        final String[] bindingStrings = toBindingStrings(bindingSetString);
+        return toBindingSet(bindingStrings, varOrder);
+    }
+
+    /**
+     * Creates a {@link BindingSet} from an ordered array of Strings that 
represent
+     * {@link Binding}s and their variable names.
+     *
+     * @param bindingStrings - An ordered array of Strings representing {@link 
Binding}s. (not null)
+     * @param varOrder - An ordered array of variable names for the binding 
strings. (not null)
+     * @return The parameters converted into a {@link BindingSet}.
+     */
+    public static BindingSet toBindingSet(final String[] bindingStrings, final 
String[] varOrder) {
+        checkNotNull(varOrder);
+        checkNotNull(bindingStrings);
+        checkArgument(varOrder.length == bindingStrings.length);
+
+        final QueryBindingSet bindingSet = new QueryBindingSet();
+
+        for(int i = 0; i < bindingStrings.length; i++) {
+            final String name = varOrder[i];
+            final Value value = FluoStringConverter.toValue(bindingStrings[i]);
+            bindingSet.addBinding(name, value);
+        }
+
+        return bindingSet;
+    }
+
+    /**
+     * Extract the {@link Binding} strings from a {@link BindingSet}'s string 
form.
+     *
+     * @param bindingSetString - A {@link BindingSet} in its Fluo String form. 
(not null)
+     * @return The set's {@link Binding}s in Fluo String form. (not null)
+     */
+    public static String[] toBindingStrings(final String bindingSetString) {
+        checkNotNull(bindingSetString);
+        return bindingSetString.split(DELIM);
+    }
+
+    /**
+     * Creates a {@link Value} from a String representation of it.
+     *
+     * @param valueString - The String representation of the value. (not null)
+     * @return The {@link Value} representation of the String.
+     */
+    public static Value toValue(final String valueString) {
+        checkNotNull(valueString);
+
+        // Split the String that was stored in Fluo into its Value and Type 
parts.
+        final String[] valueAndType = valueString.split(TYPE_DELIM);
+        if(valueAndType.length != 2) {
+            throw new IllegalArgumentException("Array must contain data and 
type info!");
+        }
+
+        final String dataString = valueAndType[0];
+        final String typeString = valueAndType[1];
+
+        // Convert the String Type into a URI that describes the type.
+        final URI typeURI = valueFactory.createURI(typeString);
+
+        // Convert the String Value into a Value.
+        final Value value = typeURI.equals(XMLSchema.ANYURI) ?
+                valueFactory.createURI(dataString) :
+                valueFactory.createLiteral(dataString, new 
URIImpl(typeString));
+
+        return value;
+    }
+
+    /**
+     * Converts the String representation of a {@link StatementPattern} back
+     * into the object version.
+     *
+     * @param patternString - The {@link StatementPattern} represented as a 
String. (not null)
+     * @return A {@link StatementPatter} built from the string.
+     */
+    public static StatementPattern toStatementPattern(final String 
patternString) {
+        checkNotNull(patternString);
+
+        final String[] parts = patternString.split(DELIM);
+        final String subjectPart = parts[0];
+        final String predicatePart = parts[1];
+        final String objectPart = parts[2];
+
+        final Var subject = toVar(subjectPart);
+        final Var predicate = toVar(predicatePart);
+        final Var object = toVar(objectPart);
+
+        return new StatementPattern(subject, predicate, object);
+    }
+
+    /**
+     * Converts the String representation of a {@link Var} back into the 
object version.
+     *
+     * @param varString - The {@link Var} represented as a String. (not null)
+     * @return A {@link Var} built from the string.
+     */
+    public static Var toVar(final String varString) {
+        checkNotNull(varString);
+
+        if(varString.startsWith("-const-")) {
+            // The variable is a constant value.
+            final String[] varParts = varString.split(TYPE_DELIM);
+            final String name = varParts[0];
+            final String valueString = name.substring("-const-".length());
+
+            final String dataTypeString = varParts[1];
+            if(dataTypeString.equals(URI_TYPE)) {
+                // Handle a URI object.
+                final Var var = new Var(name, new URIImpl(valueString));
+                var.setAnonymous(true);
+                return var;
+            } else {
+                // Literal value.
+                final URI dataType = new URIImpl(dataTypeString);
+                final Literal value = new LiteralImpl(valueString, dataType);
+                final Var var = new Var(name, value);
+                var.setAnonymous(true);
+                return var;
+            }
+        } else {
+            // The variable is a named variable.
+            return new Var(varString);
+        }
+    }
+
+    /**
+     * Provides a string representation of an SP which contains info about
+     * whether each component (subj, pred, obj) is constant and its data and
+     * data type if it is constant.
+     *
+     * @param sp - The statement pattern to convert. (not null)
+     * @return A String representation of the statement pattern that may be
+     *   used to do triple matching.
+     */
+    public static String toStatementPatternString(final StatementPattern sp) {
+        checkNotNull(sp);
+
+        final Var subjVar = sp.getSubjectVar();
+        String subj = subjVar.getName();
+        if(subjVar.isConstant()) {
+            subj = subj + TYPE_DELIM + URI_TYPE;
+        }
+
+        final Var predVar = sp.getPredicateVar();
+        String pred = predVar.getName();
+        if(predVar.isConstant()) {
+            pred = pred + TYPE_DELIM + URI_TYPE;
+        }
+
+        final Var objVar = sp.getObjectVar();
+        String obj = objVar.getName();
+        if (objVar.isConstant()) {
+            final RyaType rt = 
RdfToRyaConversions.convertValue(objVar.getValue());
+            obj =  obj + TYPE_DELIM + rt.getDataType().stringValue();
+        }
+
+        return subj + DELIM + pred + DELIM + obj;
+    }
+}
\ No newline at end of file

Reply via email to