http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
new file mode 100644
index 0000000..5cb9869
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
+
+import java.util.Map.Entry;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.types.TypedTransaction;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+
+public class IncUpdateDAO {
+
+    private static final StringTypeLayer stl = new StringTypeLayer();
+    private static final WholeRowTripleResolver tr = new 
WholeRowTripleResolver();
+
+    private static RyaStatement deserializeTriple(final Bytes row) {
+        final byte[] rowArray = row.toArray();
+
+        RyaStatement rs = null;
+        try {
+            rs = tr.deserialize(TABLE_LAYOUT.SPO, new TripleRow(rowArray, new 
byte[0], new byte[0]));
+        } catch (final TripleRowResolverException e) {
+            e.printStackTrace();
+        }
+
+        return rs;
+    }
+
+    private static String getTripleString(final RyaStatement rs) {
+        final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE;
+        final String pred = rs.getPredicate().getData() + TYPE_DELIM + 
URI_TYPE;
+        final String objData = rs.getObject().getData();
+        final String objDataType = rs.getObject().getDataType().stringValue();
+
+        return subj + DELIM + pred + DELIM + objData + TYPE_DELIM + 
objDataType;
+    }
+
+    public static String getTripleString(final Bytes row) {
+        return getTripleString(deserializeTriple(row));
+    }
+
+    /**
+     * Add a row, creating and closing a transaction.
+     *
+     * @param fluoClient - Creates connections to the Fluo table that will be 
written to. (not null)
+     * @param row - The Row ID.
+     * @param col - The Column.
+     * @param val - The value.
+     */
+    public static void addRow(final FluoClient fluoClient, final String row, 
final Column col, final String val) {
+        checkNotNull(fluoClient);
+        try (TypedTransaction tx = stl.wrap(fluoClient.newTransaction())) {
+            addRow(tx, row, col, val);
+            tx.commit();
+        }
+    }
+
+    /**
+     * Writes an entry to the Fluo table.
+     *
+     * @param tx - The transaction that will be used. (not null)
+     * @param row - The Row ID.
+     * @param col - The Column.
+     * @param val - The value.
+     */
+    public static void addRow(final TypedTransaction tx, final String row, 
final Column col, final String val) {
+        checkNotNull(tx);
+        tx.mutate().row(row).col(col).set(val);
+    }
+
+    /**
+     * Print all statements in the repo for demo and diagnostic purposes.
+     * @param fluoClient
+     * @throws Exception
+     */
+    public static void printTriples(final FluoClient fluoClient) throws 
Exception {
+        try (Snapshot snapshot = fluoClient.newSnapshot()) {
+            final ScannerConfiguration scanConfig = new ScannerConfiguration();
+            scanConfig.fetchColumn(Bytes.of("triples"), Bytes.of("SPO"));
+
+            final RowIterator rowIter = snapshot.get(scanConfig);
+
+            while (rowIter.hasNext()) {
+                final Entry<Bytes, ColumnIterator> row = rowIter.next();
+                System.out.println("Triple: " + row.getKey().toString());
+            }
+        }
+    }
+
+//    /**
+//     * Print all bindings for the given queries. For demo's and diagnostics.
+//     * @param fluoClient
+//     * @param queryNames
+//     * @throws Exception
+//     */
+//    public static void printQueryResults(final FluoClient fluoClient,
+//            final Map<String, String> queryNames) throws Exception {
+//        try (Snapshot snapshot = fluoClient.newSnapshot();
+//                TypedTransaction tx1 = 
stl.wrap(fluoClient.newTransaction())) {
+//
+//            final ScannerConfiguration scanConfig = new 
ScannerConfiguration();
+//            scanConfig.fetchColumn(Bytes.of("query"), 
Bytes.of("bindingSet"));
+//
+//            final RowIterator rowIter = snapshot.get(scanConfig);
+//            String sparqlRow = "";
+//            
System.out.println("*********************************************************");
+//
+//            while (rowIter.hasNext()) {
+//                final Entry<Bytes, ColumnIterator> row = rowIter.next();
+//                final String[] joinInfo = row.getKey().toString()
+//                        .split(NODEID_BS_DELIM);
+//                final String sparql = joinInfo[0];
+//                final String bs = joinInfo[1];
+//                if (!sparqlRow.equals(sparql)) {
+//                    sparqlRow = sparql;
+//                    System.out.println();
+//                    System.out.println();
+//                    System.out.println(queryNames.get(sparqlRow)
+//                            + " has bindings: ");
+//                    System.out.println();
+//                }
+//
+//                final String variables = 
tx1.get().row(sparqlRow).col(NODE_VARS).toString();
+//                final String[] vars = variables.split(";");
+//                final String[] bsVals = bs.split(DELIM);
+//                System.out.print("Bindingset:  ");
+//                for (int i = 0; i < vars.length; i++) {
+//                    System.out.print(vars[i] + " = " + bsVals[i] + "   ");
+//                }
+//                System.out.println();
+//
+//            }
+//
+//            
System.out.println("*********************************************************");
+//        }
+//    }
+
+    /**
+     * Print all rows in the Fluo table for diagnostics.
+     * @param fluoClient
+     * @throws Exception
+     */
+    public static void printAll(final FluoClient fluoClient) throws Exception {
+        final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n";
+        System.out
+        .println("Printing all tables.  Showing unprintable bytes and braces 
as {ff} and {{} and {}} where ff is the value in hexadecimal.");
+        System.out.format(FORMAT, "--Row--", "--Column Family--",
+                "--Column Qual--", "--Value--");
+        // Use try with resource to ensure snapshot is closed.
+        try (Snapshot snapshot = fluoClient.newSnapshot()) {
+            final ScannerConfiguration scanConfig = new ScannerConfiguration();
+            // scanConfig.setSpan(Span.prefix("word:"));
+
+            final RowIterator rowIter = snapshot.get(scanConfig);
+
+            while (rowIter.hasNext()) {
+                final Entry<Bytes, ColumnIterator> row = rowIter.next();
+                final ColumnIterator colIter = row.getValue();
+                while (colIter.hasNext()) {
+                    final Entry<Column, Bytes> column = colIter.next();
+                    // System.out.println(row.getKey() + " " +
+                    // column.getKey().getFamily()+ " " +
+                    // column.getKey().getQualifier() );
+                    System.out.format(FORMAT, to_String(row.getKey()),
+                            to_String(column.getKey().getFamily()),
+                            to_String(column.getKey().getQualifier()),
+                            to_String(column.getValue()));
+                }
+            }
+        }
+    }
+
+    /**
+     * convert a non-utf8 to string and show unprintable bytes as {xx} where x
+     * is hex.
+     *
+     * @param value
+     * @return
+     */
+    static String to_String(final Bytes bytes) {
+        return to_String(bytes.toArray());
+    }
+
+    static String to_String(final byte[] bytes) {
+        final StringBuilder sb = new StringBuilder();
+        for (final byte b : bytes) {
+            if ((b > 0x7e) || (b < 32)) {
+                sb.append("{");
+                sb.append(Integer.toHexString(b & 0xff)); // Lop off the sign 
extended ones.
+                sb.append("}");
+            } else if (b == '{' || b == '}') { // Escape the literal braces.
+                sb.append("{");
+                sb.append((char) b);
+                sb.append("}");
+            } else {
+                sb.append((char) b);
+            }
+        }
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
new file mode 100644
index 0000000..84581ef
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+public class IncrementalUpdateConstants {
+
+    // String constants used to create more easily parsed patterns.
+    public static final String DELIM = ":::";
+    public static final String VAR_DELIM = ";";
+    public static final String NODEID_BS_DELIM = "<<:>>";
+    public static final String JOIN_DELIM = "<:>J<:>";
+    public static final String TYPE_DELIM = "<<~>>";
+
+    //to be used in construction of id for each node
+    public static final String SP_PREFIX = "STATEMENT_PATTERN";
+    public static final String JOIN_PREFIX = "JOIN";
+    public static final String FILTER_PREFIX = "FILTER";
+    public static final String QUERY_PREFIX = "QUERY";
+
+    public static final String URI_TYPE = 
"http://www.w3.org/2001/XMLSchema#anyURI";;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
new file mode 100644
index 0000000..a25cb92
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import io.fluo.api.client.TransactionBase;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.Span;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Updates the results of a Join node when one of its children has added a
+ * new Binding Set to its results.
+ */
+@ParametersAreNonnullByDefault
+public class JoinResultUpdater {
+    private static final Logger log = 
Logger.getLogger(JoinResultUpdater.class);
+
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+    private final Encoder encoder = new StringEncoder();
+
+    /**
+     * Updates the results of a Join node when one of its children has added a
+     * new Binding Set to its results.
+     *
+     * @param tx - The transaction all Fluo queries will use. (not null)
+     * @param childId - The Node ID of the child whose results received a new 
Binding Set. (not null)
+     * @param childBindingSet - The Binding Set that was just emitted by child 
node. (not null)
+     * @param joinMetadata - The metadatat for the Join that has been 
notified. (not null)
+     */
+    public void updateJoinResults(
+            final TransactionBase tx,
+            final String childId,
+            final BindingSet childBindingSet,
+            final JoinMetadata joinMetadata) {
+        checkNotNull(tx);
+        checkNotNull(childId);
+        checkNotNull(joinMetadata);
+
+        // Read the Join metadata from the Fluo table.
+        final String[] joinVarOrder = 
joinMetadata.getVariableOrder().toArray();
+
+        // Transform the Child binding set and varaible order values to be 
easier to work with.
+        final VariableOrder childVarOrder = getVarOrder(tx, childId);
+        final String[] childVarOrderArray = childVarOrder.toArray();
+        final String childBindingSetString = 
FluoStringConverter.toBindingSetString(childBindingSet, 
childVarOrder.toArray());
+        final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingSetString);
+
+        // Transform the Sibling binding set and varaible order values to be 
easier to work with.
+        final String leftChildId = joinMetadata.getLeftChildNodeId();
+        final String rightChildId = joinMetadata.getRightChildNodeId();
+        final String siblingId = leftChildId.equals(childId) ? rightChildId : 
leftChildId;
+
+        final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
+        final String[] siblingVarOrderArray = siblingVarOrder.toArray();
+
+        // Create a map that will be used later in this algorithm to create 
new Join result
+        // Binding Sets. It is initialized with all of the values that are in 
childBindingSet.
+        // The common values and any that are added on by the sibling will be 
overwritten
+        // for each sibling scan result.
+        final List<String> commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
+        final Map<String, String> joinBindingSet = Maps.newHashMap();
+        for(int i = 0; i < childVarOrderArray.length; i++) {
+            joinBindingSet.put(childVarOrderArray[i], childBindingStrings[i]);
+        }
+
+        // Create the prefix that will be used to scan for binding sets of the 
sibling node.
+        // This prefix includes the sibling Node ID and the common variable 
values from
+        // childBindingSet.
+        String bsPrefix = "";
+        for(int i = 0; i < commonVars.size(); i++) {
+            if(bsPrefix.length() == 0) {
+                bsPrefix = childBindingStrings[i];
+            } else {
+                bsPrefix += DELIM + childBindingStrings[i];
+            }
+        }
+        bsPrefix = siblingId + NODEID_BS_DELIM + bsPrefix;
+
+        // Scan the sibling node's binding sets for those that have the same
+        // common variable values as childBindingSet. These needs to be joined
+        // and inserted into the Join's results. It's possible that none of 
these
+        // results will be new Join results if they have already been created 
in
+        // earlier iterations of this algorithm.
+        final ScannerConfiguration sc1 = new ScannerConfiguration();
+        sc1.setSpan(Span.prefix(bsPrefix));
+        setScanColumnFamily(sc1, siblingId);
+
+        try {
+            final RowIterator ri = tx.get(sc1);
+            while(ri.hasNext()) {
+                final ColumnIterator ci = ri.next().getValue();
+                while(ci.hasNext()){
+                    // Get a sibling binding set.
+                    final String siblingBindingSetString = 
ci.next().getValue().toString();
+                    final String[] siblingBindingStrings = 
FluoStringConverter.toBindingStrings(siblingBindingSetString);
+
+                    // Overwrite the previous sibling's values to create a new 
join binding set.
+                    for (int i = 0; i < siblingBindingStrings.length; i++) {
+                        joinBindingSet.put(siblingVarOrderArray[i], 
siblingBindingStrings[i]);
+                    }
+                    final String joinBindingSetString = 
makeBindingSetString(joinVarOrder, joinBindingSet);
+
+                    // Write the join binding set to Fluo.
+                    final Bytes row = encoder.encode(joinMetadata.getNodeId() 
+ NODEID_BS_DELIM + joinBindingSetString);
+                    final Column col = FluoQueryColumns.JOIN_BINDING_SET;
+                    final Bytes value = encoder.encode(joinBindingSetString);
+                    tx.set(row, col, value);
+                }
+            }
+        } catch (final Exception e) {
+            log.error("Error while scanning sibling binding sets to create new 
join results.", e);
+        }
+    }
+
+    /**
+     * Fetch the {@link VariableOrder} of a query node.
+     *
+     * @param tx - The transaction that will be used to read the variable 
order. (not null)
+     * @param nodeId - The ID of the node to fetch. (not null)
+     * @return The {@link VariableOrder} of the node.
+     */
+    private VariableOrder getVarOrder(final TransactionBase tx, final String 
nodeId) {
+        checkNotNull(tx);
+        checkNotNull(nodeId);
+
+        final NodeType nodeType = NodeType.fromNodeId(nodeId).get();
+        switch(nodeType) {
+            case STATEMENT_PATTERN:
+                return queryDao.readStatementPatternMetadata(tx, 
nodeId).getVariableOrder();
+
+            case FILTER:
+                return queryDao.readFilterMetadata(tx, 
nodeId).getVariableOrder();
+
+            case JOIN:
+                return queryDao.readJoinMetadata(tx, 
nodeId).getVariableOrder();
+
+            default:
+                throw new IllegalArgumentException("Could not figure out the 
variable order for node with ID: " + nodeId);
+        }
+    }
+
+    /**
+     * Assuming that the common variables between two children are already
+     * shifted to the left, find the common variables between them.
+     * <p>
+     * Refer to {@link FluoQueryInitializer} to see why this assumption is 
being made.
+     *
+     * @param vars1 - The first child's variable order. (not null)
+     * @param vars2 - The second child's variable order. (not null)
+     * @return An ordered List of the common variables between the two 
children.
+     */
+    public List<String> getCommonVars(final VariableOrder vars1, final 
VariableOrder vars2) {
+        checkNotNull(vars1);
+        checkNotNull(vars2);
+
+        final List<String> commonVars = new ArrayList<>();
+
+        // Only need to iteratre through the shorted order's length.
+        final Iterator<String> vars1It = vars1.iterator();
+        final Iterator<String> vars2It = vars2.iterator();
+        while(vars1It.hasNext() && vars2It.hasNext()) {
+            final String var1 = vars1It.next();
+            final String var2 = vars2It.next();
+
+            if(var1.equals(var2)) {
+                commonVars.add(var1);
+            } else {
+                // Because the common variables are left shifted, we can break 
once
+                // we encounter a pair that does not match.
+                break;
+            }
+        }
+
+        return commonVars;
+    }
+
+//    /**
+//     * Assuming that the common variables between two children are already
+//     * shifted to the left, find the common variables between them.
+//     * <p>
+//     * Refer to {@link FluoQueryInitializer} to see why this assumption is 
being made.
+//     *
+//     * @param vars1 - The first child's variable order. (not null)
+//     * @param vars2 - The second child's variable order. (not null)
+//     * @return An ordered List of the common variables between the two 
children.
+//     */
+//    private List<String> getCommonVars(final String[] vars1, final String[] 
vars2) {
+//        checkNotNull(vars1);
+//        checkNotNull(vars2);
+//
+//        final List<String> commonVars = new ArrayList<>();
+//
+//        // Only need to iteratre through the shorted order's length.
+//        final int shortestLen = Math.min(vars1.length, vars2.length);
+//        for(int i = 0; i < shortestLen; i++) {
+//            final String var1 = vars1[i];
+//            final String var2 = vars2[i];
+//
+//            if(var1.equals(var2)) {
+//                commonVars.add(var1);
+//            } else {
+//                // Because the common variables are left shifted, we can 
break once
+//                // we encounter a pair that does not match.
+//                break;
+//            }
+//        }
+//
+//        return commonVars;
+//    }
+
+    /**
+     * Update a {@link ScannerConfiguration} to use the sibling node's binding
+     * set column for its scan. The column that will be used is determined by 
the
+     * node's {@link NodeType}.
+     *
+     * @param sc - The scan configuration that will be updated. (not null)
+     * @param siblingId - The Node ID of the sibling. (not null)
+     */
+    private static void setScanColumnFamily(final ScannerConfiguration sc, 
final String siblingId) {
+        checkNotNull(sc);
+        checkNotNull(siblingId);
+
+        // Determine which type of binding set the sibling is.
+        final Optional<NodeType> siblingType = NodeType.fromNodeId(siblingId);
+        if(!siblingType.isPresent()) {
+            throw new IllegalStateException("The child's sibling is not of a 
recognized type.");
+        }
+
+        // Set the column to join with.
+        Column column;
+        switch(siblingType.get()) {
+            case STATEMENT_PATTERN:
+                column = FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET;
+                break;
+            case FILTER:
+                column = FluoQueryColumns.FILTER_BINDING_SET;
+                break;
+            case JOIN:
+                column = FluoQueryColumns.JOIN_BINDING_SET;
+                break;
+            default:
+                throw new IllegalArgumentException("The child node's sibling 
is not of type StatementPattern, Join, or Filter.");
+        }
+        sc.fetchColumn(column.getFamily(), column.getQualifier());
+    }
+
+    /**
+     * Create a Binding Set String from a variable order and a map of bindings.
+     *
+     * @param varOrder - The resulting binding set's variable order. (not null)
+     * @param bindingSetValues - A map holding the variables and their values 
that will be
+     *   included in the resulting binding set.
+     * @return A binding set string build from the map using the prescribed 
variable order.
+     */
+    private static String makeBindingSetString(final String[] varOrder, final 
Map<String, String> bindingSetValues) {
+        checkNotNull(varOrder);
+        checkNotNull(bindingSetValues);
+
+        String bindingSetString = "";
+
+        for (final String joinVar : varOrder) {
+            if (bindingSetString.length() == 0) {
+                bindingSetString = bindingSetValues.get(joinVar);
+            } else {
+                bindingSetString = bindingSetString + DELIM + 
bindingSetValues.get(joinVar);
+            }
+        }
+
+        return bindingSetString;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
new file mode 100644
index 0000000..e9f6243
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+
+import com.google.common.base.Optional;
+
+/**
+ * Represents the different types of nodes that a Query may have.
+ */
+public enum NodeType {
+    FILTER,
+    JOIN,
+    STATEMENT_PATTERN,
+    QUERY;
+
+    /**
+     * Get the {@link NodeType} of a node based on its Node ID.
+     *
+     * @param nodeId - The Node ID of a node. (not null)
+     * @return The {@link NodeType} of the node if it could be derived from the
+     *   node's ID, otherwise absent.
+     */
+    public static Optional<NodeType> fromNodeId(final String nodeId) {
+        checkNotNull(nodeId);
+
+        NodeType type = null;
+
+        if(nodeId.startsWith(SP_PREFIX)) {
+            type = STATEMENT_PATTERN;
+        } else if(nodeId.startsWith(FILTER_PREFIX)) {
+            type = FILTER;
+        } else if(nodeId.startsWith(JOIN_PREFIX)) {
+            type = JOIN;
+        } else if(nodeId.startsWith(QUERY_PREFIX)) {
+            type = QUERY;
+        }
+
+        return Optional.fromNullable(type);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
new file mode 100644
index 0000000..e4d0155
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import io.fluo.api.client.TransactionBase;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Updates the results of a Query node when one of its children has added a
+ * new Binding Set to its results.
+ */
+@ParametersAreNonnullByDefault
+public class QueryResultUpdater {
+
+    private final Encoder encoder = new StringEncoder();
+
+    /**
+     * Updates the results of a Query node when one of its children has added a
+     * new Binding Set to its results.
+     *
+     * @param tx - The transaction all Fluo queries will use. (not null)
+     * @param childBindingSet - A binding set that the query's child node has 
emmitted. (not null)
+     * @param queryMetadata - The metadata of the Query whose results will be 
updated. (not null)
+     */
+    public void updateQueryResults(
+            final TransactionBase tx,
+            final BindingSet childBindingSet,
+            final QueryMetadata queryMetadata) {
+        checkNotNull(tx);
+        checkNotNull(childBindingSet);
+        checkNotNull(queryMetadata);
+
+        // Create the query's Binding Set from the child node's binding set.
+        final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
+
+        final MapBindingSet queryBindingSet = new MapBindingSet();
+        for(final String bindingName : queryVarOrder) {
+            final Binding binding = childBindingSet.getBinding(bindingName);
+            queryBindingSet.addBinding(binding);
+        }
+        final String queryBindingSetString = 
BindingSetStringConverter.toString(queryBindingSet, queryVarOrder);
+
+        // Commit it to the Fluo table for the SPARQL query. This isn't 
guaranteed to be a new entry.
+        final Bytes row = encoder.encode(queryMetadata.getNodeId() + 
NODEID_BS_DELIM + queryBindingSetString);
+        final Column col = FluoQueryColumns.QUERY_BINDING_SET;
+        final Bytes value = encoder.encode(queryBindingSetString);
+        tx.set(row, col, value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
new file mode 100644
index 0000000..35fcf52
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import io.fluo.api.types.StringEncoder;
+import io.fluo.api.types.TypeLayer;
+
+public class StringTypeLayer extends TypeLayer {
+
+    public StringTypeLayer() {
+        super(new StringEncoder());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
new file mode 100644
index 0000000..fbbae33
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.types.TypedTransactionBase;
+
+/**
+ * Exports a single Binding Set that is a new result for a SPARQL query to some
+ * other location.
+ */
+@ParametersAreNonnullByDefault
+public interface IncrementalResultExporter {
+
+    /**
+     * Export a Binding Set that is a result of a SPARQL query.
+     *
+     * @param tx - The Fluo transaction this export is a part of. (not null)
+     * @param queryId - The Fluo ID of the SPARQL query the binding set is a 
result of. (not null)
+     * @param bindingSetString - The binding set as it was represented within 
the
+     *   Fluo application. (not null)
+     * @throws ResultExportException The result could not be exported.
+     */
+    public void export(TypedTransactionBase tx, String queryId, BindingSet 
result) throws ResultExportException;
+
+    /**
+     * A result could not be exported.
+     */
+    public static class ResultExportException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         */
+        public ResultExportException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ResultExportException(final String message, final Throwable 
cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
new file mode 100644
index 0000000..aae42ef
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import com.google.common.base.Optional;
+
+import io.fluo.api.observer.Observer.Context;
+
+/**
+ * Builds instances of {@link IncrementalResultExporter} using the provided
+ * configurations.
+ */
+@ParametersAreNonnullByDefault
+public interface IncrementalResultExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalResultExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; 
otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a 
result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based 
problem.
+     */
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException;
+
+    /**
+     * Indicates a {@link IncrementalResultExporter} could not be created by a
+     * {@link IncrementalResultExporterFactory}.
+     */
+    public static class IncrementalExporterFactoryException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public IncrementalExporterFactoryException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public IncrementalExporterFactoryException(final String message, final 
Throwable t) {
+            super(message, t);
+        }
+    }
+
+    /**
+     * The configuration could not be interpreted because required fields were
+     * missing or a value wasn't properly formatted.
+     */
+    public static class ConfigurationException extends 
IncrementalExporterFactoryException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public ConfigurationException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ConfigurationException(final String message, final Throwable 
cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java
new file mode 100644
index 0000000..57938c7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ParametersBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import com.google.common.base.Optional;
+
+/**
+ * Contains common parsing functions that make it easier to interpret 
parameter maps.
+ */
+@ParametersAreNonnullByDefault
+public abstract class ParametersBase {
+
+    protected final Map<String, String> params;
+
+    /**
+     * Constructs an instance of {@link ParametersBase}.
+     *
+     * @param params - The parameters that will be wrapped. (not null)
+     */
+    public ParametersBase(final Map<String, String> params) {
+        this.params = checkNotNull(params);
+    }
+
+    protected static void setBoolean(final Map<String, String> params, final 
String paramName, final boolean value) {
+        checkNotNull(params);
+        checkNotNull(paramName);
+        params.put(paramName, Boolean.toString(value));
+    }
+
+    protected static boolean getBoolean(final Map<String, String> params, 
final String paramName, final boolean defaultValue) {
+        checkNotNull(params);
+        checkNotNull(paramName);
+
+        final Optional<String> paramValue = Optional.fromNullable( 
params.get(paramName) );
+        return paramValue.isPresent() ? Boolean.parseBoolean( paramValue.get() 
) : defaultValue;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
new file mode 100644
index 0000000..525f9c3
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+import com.google.common.base.Optional;
+
+import io.fluo.api.observer.Observer;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to Rya PCJ exporting.
+ */
+@ParametersAreNonnullByDefault
+public class RyaExportParameters extends ParametersBase {
+
+    public static final String CONF_EXPORT_TO_RYA = 
"pcj.fluo.export.rya.enabled";
+    public static final String CONF_ACCUMULO_INSTANCE_NAME = 
"pcj.fluo.export.rya.accumuloInstanceName";
+    public static final String CONF_ZOOKEEPER_SERVERS = 
"pcj.fluo.export.rya.zookeeperServers";
+    public static final String CONF_EXPORTER_USERNAME = 
"pcj.fluo.export.rya.exporterUsername";
+    public static final String CONF_EXPORTER_PASSWORD = 
"pcj.fluo.export.rya.exporterPassword";
+
+    /**
+     * Constructs an instance of {@link RyaExportParameters}.
+     *
+     * @param params - The parameters object that will be read/writen to. (not 
null)
+     */
+    public RyaExportParameters(final Map<String, String> params) {
+        super(params);
+    }
+
+    /**
+     * @param isExportToRya - {@code True} if the Fluo application should 
export
+     *   to Rya; otherwise {@code false}.
+     */
+    public void setExportToRya(final boolean isExportToRya) {
+        setBoolean(params, CONF_EXPORT_TO_RYA, isExportToRya);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should export to Rya; 
otherwise
+     *   {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean isExportToRya() {
+        return getBoolean(params, CONF_EXPORT_TO_RYA, false);
+    }
+
+    /**
+     * @param accumuloInstanceName - The name of the Accumulo instance the 
exporter will connect to.
+     */
+    public void setAccumuloInstanceName(@Nullable final String 
accumuloInstanceName) {
+        params.put(CONF_ACCUMULO_INSTANCE_NAME, accumuloInstanceName);
+    }
+
+    /**
+     * @return The name of the Accumulo instance the exporter will connect to.
+     */
+    public Optional<String> getAccumuloInstanceName() {
+        return Optional.fromNullable( params.get(CONF_ACCUMULO_INSTANCE_NAME) 
);
+    }
+
+    /**
+     * @param zookeeperServers - A semicolon delimited list of Zookeeper
+     *   server hostnames for the zookeepers that provide connections ot the
+     *   target Accumulo instance.
+     */
+    public void setZookeeperServers(@Nullable final String zookeeperServers) {
+        params.put(CONF_ZOOKEEPER_SERVERS, zookeeperServers);
+    }
+
+    /**
+     * @return A semicolon delimited list of Zookeeper  server hostnames for 
the
+     *   zookeepers that provide connections ot the target Accumulo instance.
+     */
+    public Optional<String> getZookeeperServers() {
+        return Optional.fromNullable( params.get(CONF_ZOOKEEPER_SERVERS) );
+    }
+
+    /**
+     * @param exporterUsername - The username that will be used to export PCJ
+     *   results to the destination Accumulo table.
+     */
+    public void setExporterUsername(@Nullable final String exporterUsername) {
+        params.put(CONF_EXPORTER_USERNAME, exporterUsername);
+    }
+
+    /**
+     * @return The username that will be used to export PCJ results to the
+     *   destination Accumulo table.
+     */
+    public Optional<String> getExporterUsername() {
+        return Optional.fromNullable( params.get(CONF_EXPORTER_USERNAME) );
+    }
+
+    /**
+     * @param exporterPassword - The password that will be used to export PCJ
+     *   results to the destination Accummulo table.
+     */
+    public void setExporterPassword(@Nullable final String exporterPassword) {
+        params.put(CONF_EXPORTER_PASSWORD, exporterPassword);
+    }
+
+    /**
+     * @return The password that will be used to export PCJ
+     *   results to the destination Accummulo table.
+     */
+    public Optional<String> getExporterPassword() {
+        return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
new file mode 100644
index 0000000..4d51798
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.data.Bytes;
+import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+
+/**
+ * Incrementally exports SPARQL query results to Accumulo PCJ tables as they 
are defined by Rya.
+ */
+public class RyaResultExporter implements IncrementalResultExporter {
+
+    private final Connector accumuloConn;
+    private final PcjTables pcjTables;
+
+    /**
+     * Constructs an instance of {@link RyaResultExporter}.
+     *
+     * @param accumuloConn - A connection to the Accumulo instance that hosts 
Rya PCJ tables. (not null)
+     * @param pcjTables - A utility used to interact with Rya's PCJ tables. 
(not null)
+     */
+    public RyaResultExporter(final Connector accumuloConn, final PcjTables 
pcjTables) {
+        this.accumuloConn = checkNotNull(accumuloConn);
+        this.pcjTables = checkNotNull(pcjTables);
+    }
+
+    @Override
+    public void export(
+            final TypedTransactionBase fluoTx,
+            final String queryId,
+            final BindingSet result) throws ResultExportException {
+        checkNotNull(fluoTx);
+        checkNotNull(queryId);
+        checkNotNull(result);
+
+        // Get the name of the table the PCJ results will be written to.
+        final String pcjTableName = fluoTx.get(Bytes.of(queryId), 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString();
+
+        // Write the result to the PCJ table.
+        try {
+            pcjTables.addResults(accumuloConn, pcjTableName, 
Collections.singleton(result));
+        } catch (final PcjException e) {
+            throw new ResultExportException("A result could not be exported to 
the PCJ table in Accumulo.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
new file mode 100644
index 0000000..fd63314
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+
+import com.google.common.base.Optional;
+
+import io.fluo.api.observer.Observer.Context;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+
+/**
+ * Creates instances of {@link RyaResultExporter}.
+ */
+public class RyaResultExporterFactory implements 
IncrementalResultExporterFactory {
+
+    @Override
+    public Optional<IncrementalResultExporter> build(final Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
+        checkNotNull(context);
+
+        // Wrap the context's parameters for parsing.
+        final RyaExportParameters params = new RyaExportParameters( 
context.getParameters() );
+
+        if(params.isExportToRya()) {
+            final String accumuloInstance = 
params.getAccumuloInstanceName().get();
+            final String zookeeperServers =  
params.getZookeeperServers().get().replaceAll(";", ",");
+            final Instance inst = new ZooKeeperInstance(accumuloInstance, 
zookeeperServers);
+
+            try {
+                final String exporterUsername = 
params.getExporterUsername().get();
+                final String exporterPassword = 
params.getExporterPassword().get();
+                final Connector accumuloConn = 
inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+                final IncrementalResultExporter exporter = new 
RyaResultExporter(accumuloConn, new PcjTables());
+                return Optional.of(exporter);
+
+            } catch (final AccumuloException | AccumuloSecurityException e) {
+                throw new IncrementalExporterFactoryException("Could not 
initialize the Accumulo connector using the provided configuration.", e);
+            }
+        } else {
+            return Optional.absent();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
new file mode 100644
index 0000000..d3c5bb5
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.client.TransactionBase;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.types.TypedObserver;
+import io.fluo.api.types.TypedTransactionBase;
+
+/**
+ * Notified when the results of a node have been updated to include a new 
Binding
+ * Set. This observer updates its parent if the new Binding Set effects the 
parent's
+ * results.
+ */
+@ParametersAreNonnullByDefault
+public abstract class BindingSetUpdater extends TypedObserver {
+
+    // DAO
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    // Updaters
+    private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
+    private final FilterResultUpdater filterUpdater = new 
FilterResultUpdater();
+    private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
+
+    @Override
+    public abstract ObservedColumn getObservedColumn();
+
+    /**
+     * Create an {@link Observation} that defines the work that needs to be 
done.
+     *
+     * @param tx - The Fluo transaction being used for the observer 
notification. (not null)
+     * @param parsedRow - The RowID parsed into a Binding Set and Node ID. 
(not null)
+     * @return An {@link Observation} that defines the work that needs to be 
done.
+     */
+    public abstract Observation parseObservation(TransactionBase tx, final 
BindingSetRow parsedRow);
+
+    @Override
+    public final void process(final TypedTransactionBase tx, final Bytes row, 
final Column col) {
+        checkNotNull(tx);
+        checkNotNull(row);
+        checkNotNull(col);
+
+        final Observation observation = parseObservation( tx, 
BindingSetRow.make(row) );
+        final String observedNodeId = observation.getObservedNodeId();
+        final BindingSet observedBindingSet = 
observation.getObservedBindingSet();
+        final String parentNodeId = observation.getParentId();
+
+        // Figure out which node needs to handle the new metadata.
+        final NodeType parentNodeType = 
NodeType.fromNodeId(parentNodeId).get();
+        switch(parentNodeType) {
+            case QUERY:
+                final QueryMetadata parentQuery = 
queryDao.readQueryMetadata(tx, parentNodeId);
+                queryUpdater.updateQueryResults(tx, observedBindingSet, 
parentQuery);
+                break;
+
+            case FILTER:
+                final FilterMetadata parentFilter = 
queryDao.readFilterMetadata(tx, parentNodeId);
+                try {
+                    filterUpdater.updateFilterResults(tx, observedBindingSet, 
parentFilter);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process a Filter 
node.", e);
+                }
+                break;
+
+            case JOIN:
+                final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, 
parentNodeId);
+                joinUpdater.updateJoinResults(tx, observedNodeId, 
observedBindingSet, parentJoin);
+                break;
+
+            default:
+                throw new IllegalArgumentException("The parent node's NodeType 
must be of type Filter, Join, or Query, but was " + parentNodeType);
+        }
+    }
+
+    /**
+     * Defines who just emitted a new Binding Set result, the Binding Set 
itself,
+     * and which node must now handle it.
+     */
+    public static final class Observation {
+
+        private final String observedNodeId;
+        private final BindingSet observedBindingSet;
+        private final String parentNodeId;
+
+        /**
+         * Creates an instance of {@link Observation}.
+         *
+         * @param observedNodeId - The Node ID that just emitted a new Binding 
Set. (not null)
+         * @param observedBindingSet - A Binding Set that was just emitted. 
(not null)
+         * @param parentNodeId - The Node ID of the node that must handle the 
new Binding Set input. (not null)
+         */
+        public Observation(
+                final String observedNodeId,
+                final BindingSet observedBindingSet,
+                final String parentNodeId) {
+            this.observedNodeId = checkNotNull(observedNodeId);
+            this.observedBindingSet = checkNotNull(observedBindingSet);
+            this.parentNodeId = checkNotNull(parentNodeId);
+        }
+
+        /**
+         * @return The Node ID that just emitted a new Binding Set.
+         */
+        public String getObservedNodeId() {
+            return observedNodeId;
+        }
+
+        /**
+         * @return A Binding Set that was just emitted.
+         */
+        public BindingSet getObservedBindingSet() {
+            return observedBindingSet;
+        }
+
+        /**
+         * @return The Node ID of the node that must handle the new Binding 
Set input.
+         */
+        public String getParentId() {
+            return parentNodeId;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
new file mode 100644
index 0000000..c6b6303
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.client.TransactionBase;
+
+/**
+ * Notified when the results of a Filter have been updated to include a new
+ * {@link BindingSet}. This observer updates its parent if the new Binding Set
+ * effects the parent's results.
+ */
+public class FilterObserver extends BindingSetUpdater {
+
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.FILTER_BINDING_SET, 
NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
+        checkNotNull(tx);
+        checkNotNull(parsedRow);
+
+        // Read the Filter metadata.
+        final String filterNodeId = parsedRow.getNodeId();
+        final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, 
filterNodeId);
+
+        // Read the Binding Set that was just emmitted by the Filter.
+        final String[] filterBindingStrings = parsedRow.getBindingStrings();
+        final String[] filterVarOrder = 
filterMetadata.getVariableOrder().toArray();
+        final BindingSet filterBindingSet = 
FluoStringConverter.toBindingSet(filterBindingStrings, filterVarOrder);
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = filterMetadata.getParentNodeId();
+
+        return new Observation(filterNodeId, filterBindingSet, parentNodeId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
new file mode 100644
index 0000000..980c7bc
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.client.TransactionBase;
+
+/**
+ * Notified when the results of a Join have been updated to include a new
+ * {@link BindingSet}. This observer updates its parent if the new Binding Set
+ * effects the parent's results.
+ */
+public class JoinObserver extends BindingSetUpdater {
+
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.JOIN_BINDING_SET, 
NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
+        checkNotNull(parsedRow);
+
+        // Read the Join metadata.
+        final String joinNodeId = parsedRow.getNodeId();
+        final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, 
joinNodeId);
+
+        // Read the Binding Set that was just emmitted by the Join.
+        final String[] joinBindingStrings = parsedRow.getBindingStrings();
+        final String[] joinVarOrder = 
joinMetadata.getVariableOrder().toArray();
+        final BindingSet joinBindingSet = 
FluoStringConverter.toBindingSet(joinBindingStrings, joinVarOrder);
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = joinMetadata.getParentNodeId();
+
+        return new Observation(joinNodeId, joinBindingSet, parentNodeId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
new file mode 100644
index 0000000..8a60039
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.types.TypedObserver;
+import io.fluo.api.types.TypedTransactionBase;
+
+/**
+ * Performs incremental result exporting to the configured destinations.
+ */
+public class QueryResultObserver extends TypedObserver {
+    private static final Logger log = 
Logger.getLogger(QueryResultObserver.class);
+
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    /**
+     * Builders for each type of result exporter we support.
+     */
+    private static final ImmutableSet<IncrementalResultExporterFactory> 
factories =
+            ImmutableSet.<IncrementalResultExporterFactory>builder()
+                .add(new RyaResultExporterFactory())
+                .build();
+
+    /**
+     * The exporters that are configured.
+     */
+    private ImmutableSet<IncrementalResultExporter> exporters = null;
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, 
NotificationType.STRONG);
+    }
+
+    /**
+     * Before running, determine which exporters are configured and set them 
up.
+     */
+    @Override
+    public void init(final Context context) {
+        final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder 
= ImmutableSet.builder();
+
+        for(final IncrementalResultExporterFactory builder : factories) {
+            try {
+                final Optional<IncrementalResultExporter> exporter = 
builder.build(context);
+                if(exporter.isPresent()) {
+                    exportersBuilder.add(exporter.get());
+                }
+            } catch (final IncrementalExporterFactoryException e) {
+                log.error("Could not initialize a result exporter.", e);
+            }
+        }
+
+        exporters = exportersBuilder.build();
+    }
+
+    @Override
+    public void process(final TypedTransactionBase tx, final Bytes row, final 
Column col) {
+        // Read the SPARQL query and it Binding Set from the row id.
+        final String[] queryAndBindingSet = 
row.toString().split(NODEID_BS_DELIM);
+        final String queryId = queryAndBindingSet[0];
+        final String bindingSetString = queryAndBindingSet[1];
+
+        // Fetch the query's Variable Order from the Fluo table.
+        final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, 
queryId);
+        final String[] varOrder = queryMetadata.getVariableOrder().toArray();
+
+        // Export the result using each of the provided exporters.
+        final BindingSet result = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+        for(final IncrementalResultExporter exporter : exporters) {
+            try {
+                exporter.export(tx, queryId, result);
+            } catch (final ResultExportException e) {
+                log.error("Could not export a binding set for query '" + 
queryId + "'. Binding Set: " + bindingSetString);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
new file mode 100644
index 0000000..00f9b13
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.openrdf.query.BindingSet;
+
+import io.fluo.api.client.TransactionBase;
+
+/**
+ * Notified when the results of a Statement Pattern have been updated to 
include
+ * a new {@link BindingSet}. This observer updates its parent if the new
+ * Binding Set effects the parent's results.
+ */
+public class StatementPatternObserver extends BindingSetUpdater {
+
+    // DAO
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new 
ObservedColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, 
NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
+        checkNotNull(tx);
+
+        // Read the Statement Pattern metadata.
+        final String spNodeId = parsedRow.getNodeId();
+        final StatementPatternMetadata spMetadata = 
queryDao.readStatementPatternMetadata(tx, spNodeId);
+
+        // Read the Binding Set that was just emmitted by the Statement 
Pattern.
+        final String[] spBindingStrings = parsedRow.getBindingStrings();
+        final String[] spVarOrder = spMetadata.getVariableOrder().toArray();
+        final BindingSet spBindingSet = 
FluoStringConverter.toBindingSet(spBindingStrings, spVarOrder);
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = spMetadata.getParentNodeId();
+
+        return new Observation(spNodeId, spBindingSet, parentNodeId);
+    }
+}
\ No newline at end of file

Reply via email to