http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
new file mode 100644
index 0000000..844a7a4
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter.toVarOrderString;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Creates the {@link FluoQuery} metadata that is required by the Fluo
+ * application to process a SPARQL query.
+ */
+public class SparqlFluoQueryBuilder {
+
+    /**
+     * Creates the {@link FluoQuery} metadata that is required by the Fluo
+     * application to process a SPARQL query.
+     *
+     * @param parsedQuery - The query metadata will be derived from. (not null)
+     * @param nodeIds - The NodeIds object is passed in so that other parts
+     *   of the application may look up which ID is associated with each
+     *   node of the query.
+     * @return A {@link FluoQuery} object loaded with metadata built from the
+     *   {@link ParsedQuery}.
+     */
+    public FluoQuery make(final ParsedQuery parsedQuery, final NodeIds 
nodeIds) {
+        checkNotNull(parsedQuery);
+
+        final String sparql = parsedQuery.getSourceString();
+        final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
+
+        final NewQueryVisitor visitor = new NewQueryVisitor(sparql, 
fluoQueryBuilder, nodeIds);
+        parsedQuery.getTupleExpr().visit( visitor );
+
+        final FluoQuery fluoQuery = fluoQueryBuilder.build();
+        return fluoQuery;
+    }
+
+    /**
+     * A data structure that creates and keeps track of Node IDs for the nodes
+     * of a {@link ParsedQuery}. This structure should only be used while 
creating
+     * a new PCJ in Fluo and disposed of afterwards.
+     */
+    @ParametersAreNonnullByDefault
+    public static final class NodeIds {
+
+        /**
+         * Maps from a parsed SPARQL query's node to the Node ID that has been 
assigned to it.
+         */
+        private final Map<QueryModelNode, String> nodeIds = new HashMap<>();
+
+        /**
+         * Checks if a SPARQL query's node has had a Node ID assigned to it 
yet.
+         *
+         * @param node - The node to check. (not null)
+         * @return {@code true} if the {@code node} has had a Node ID assigned 
to
+         *   it; otherwise {@code false}.
+         */
+        public boolean hasId(final QueryModelNode node) {
+            checkNotNull(node);
+            return nodeIds.containsKey(node);
+        }
+
+        /**
+         * Get the Node ID that has been assigned a specific node of a SPARQL 
query.
+         *
+         * @param node - The node whose ID will be fetched. (not null)
+         * @return The Node ID that has been assigned to {@code node} if one
+         *   has been assigned to it; otherwise {@code absent}.
+         */
+        public Optional<String> getId(final QueryModelNode node) {
+            checkNotNull(node);
+            return Optional.fromNullable( nodeIds.get(node) );
+        }
+
+        /**
+         * Get the Node ID that has been assigned to a specific node of a 
SPARQL
+         * query. If one hasn't been assigned yet, then one will be generated
+         * and assigned to it.
+         *
+         * @param node - The node whose ID will be fetched or generated. (not 
null)
+         * @return The Node ID that is assigned to {@code node}.
+         */
+        public String getOrMakeId(final QueryModelNode node) {
+            checkNotNull(node);
+
+            // If a Node ID has already been assigned, return it.
+            if(nodeIds.containsKey(node)){
+                return nodeIds.get(node);
+            }
+
+            // Otherwise create a new ID and store it for later.
+            final String id = makeId(node);
+            nodeIds.put(node, id);
+            return id;
+        }
+
+        private String makeId(final QueryModelNode node) {
+            checkNotNull(node);
+
+            // Create the prefix of the id. This makes it a little bit more 
human readable.
+            String prefix;
+            if(node instanceof StatementPattern) {
+                prefix = SP_PREFIX;
+            } else if(node instanceof Filter) {
+                prefix = FILTER_PREFIX;
+            } else if(node instanceof Join) {
+                prefix = JOIN_PREFIX;
+            } else if(node instanceof Projection) {
+                prefix = QUERY_PREFIX;
+            } else {
+                throw new IllegalArgumentException("Node must be of type 
{StatementPattern, Join, Filter, Projection} but was " + node.getClass());
+            }
+
+            // Create the unique portion of the id.
+            final String unique = UUID.randomUUID().toString().replaceAll("-", 
"");
+
+            // Put them together to create the Node ID.
+            return prefix + "_" + unique;
+        }
+    }
+
+    /**
+     * Visits each node of a {@link ParsedQuery} and adds information about
+     * the node to a {@link FluoQuery.Builder}. This information is used by the
+     * application's observers to incrementally update a PCJ.
+     */
+    private static class NewQueryVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+
+        private final NodeIds nodeIds;
+        private final FluoQuery.Builder fluoQueryBuilder;
+        private final String sparql;
+
+        /**
+         * Stored with each Filter node so that we can figure out how to 
evaluate it within
+         * {@link FilterResultUpdater}. Incremented each time a filter has 
been stored.
+         */
+        private int filterIndexWithinQuery = 0;
+
+        /**
+         * Constructs an instance of {@link NewQueryVisitor}.
+         *
+         * @param sparql - The SPARQL query whose structure will be represented
+         *   within a Fluo application. (not null)
+         * @param fluoQueryBuilder - The builder that will be updated by this
+         *   vistior to include metadata about each of the query nodes. (not 
null)
+         * @param nodeIds - The NodeIds object is passed in so that other parts
+         *   of the application may look up which ID is associated with each
+         *   node of the query.
+         */
+        public NewQueryVisitor(final String sparql, final FluoQuery.Builder 
fluoQueryBuilder, final NodeIds nodeIds) {
+            this.sparql = checkNotNull(sparql);
+            this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder);
+            this.nodeIds = checkNotNull(nodeIds);
+        }
+
+        @Override
+        public void meet(final StatementPattern node) {
+            // Extract metadata that will be stored from the node.
+            final String spNodeId = nodeIds.getOrMakeId(node);
+            final String pattern = 
FluoStringConverter.toStatementPatternString(node);
+
+            // Get or create a builder for this node populated with the known 
metadata.
+            StatementPatternMetadata.Builder spBuilder = 
fluoQueryBuilder.getStatementPatternBuilder(spNodeId).orNull();
+            if(spBuilder == null) {
+                spBuilder = StatementPatternMetadata.builder(spNodeId);
+                fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+            }
+            spBuilder.setStatementPattern(pattern);
+        }
+
+        @Override
+        public void meet(final Join node) {
+            // Extract the metadata that will be stored from the node.
+            final String joinNodeId = nodeIds.getOrMakeId(node);
+            final QueryModelNode left = node.getLeftArg();
+            final QueryModelNode right = node.getRightArg();
+
+            if(left == null || right == null) {
+                throw new IllegalArgumentException("Join args connot be 
null.");
+            }
+
+            final String leftChildNodeId = nodeIds.getOrMakeId(left);
+            final String rightChildNodeId = nodeIds.getOrMakeId( right );
+
+            // Get or create a builder for this node populated with the known 
metadata.
+            JoinMetadata.Builder joinBuilder = 
fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull();
+            if(joinBuilder == null) {
+                joinBuilder = JoinMetadata.builder(joinNodeId);
+                fluoQueryBuilder.addJoinMetadata(joinBuilder);
+            }
+            joinBuilder.setLeftChildNodeId( leftChildNodeId );
+            joinBuilder.setRightChildNodeId( rightChildNodeId );
+
+            // Figure out the variable order for each child node's binding set 
and
+            // store it. Also store that each child node's parent is this join.
+            final Set<String> leftVars = getVars((TupleExpr)left);
+            final Set<String> rightVars = getVars((TupleExpr) right);
+            final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, 
rightVars);
+
+            // Create or update the left child's variable order and parent 
node id.
+            final VariableOrder leftVarOrder = new VariableOrder( 
varOrders.getLeftVarOrder() );
+            setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId);
+
+            // Create or update the right child's variable order and parent 
node id.
+            final VariableOrder rightVarOrder = new VariableOrder( 
varOrders.getRightVarOrder() );
+            setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId);
+
+            // Walk to the next node.
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final Filter node) {
+            // Get or create a builder for this node populated with the known 
metadata.
+            final String filterId = nodeIds.getOrMakeId(node);
+
+            FilterMetadata.Builder filterBuilder = 
fluoQueryBuilder.getFilterBuilder(filterId).orNull();
+            if(filterBuilder == null) {
+                filterBuilder = FilterMetadata.builder(filterId);
+                fluoQueryBuilder.addFilterMetadata(filterBuilder);
+            }
+
+            filterBuilder.setOriginalSparql(sparql);
+            filterBuilder.setFilterIndexWithinSparql(filterIndexWithinQuery++);
+
+            final QueryModelNode child = node.getArg();
+            if(child == null) {
+                throw new IllegalArgumentException("Filter arg connot be 
null.");
+            }
+
+            final String childNodeId = nodeIds.getOrMakeId(child);
+            filterBuilder.setChildNodeId(childNodeId);
+
+            // Update the child node's metadata.
+            final Set<String> childVars = getVars((TupleExpr)child);
+            final VariableOrder childVarOrder = new VariableOrder( 
toVarOrderString(childVars) );
+            setChildMetadata(childNodeId, childVarOrder, filterId);
+
+            // Walk to the next node.
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final Projection node) {
+            // Create a builder for this node populated with the metadata.
+            final String queryId = nodeIds.getOrMakeId(node);
+            final VariableOrder queryVarOrder = new VariableOrder( 
toVarOrderString( node.getAssuredBindingNames() ) );
+
+            final QueryMetadata.Builder queryBuilder = 
QueryMetadata.builder(queryId);
+            fluoQueryBuilder.setQueryMetadata(queryBuilder);
+
+            queryBuilder.setSparql(sparql);
+            queryBuilder.setVariableOrder(queryVarOrder);
+
+            final QueryModelNode child = node.getArg();
+            if(child == null) {
+                throw new IllegalArgumentException("Filter arg connot be 
null.");
+            }
+
+            final String childNodeId = nodeIds.getOrMakeId(child);
+            queryBuilder.setChildNodeId(childNodeId);
+
+            // Update the child node's metadata.
+            final Set<String> childVars = getVars((TupleExpr)child);
+            final VariableOrder childVarOrder = new VariableOrder( 
toVarOrderString(childVars) );
+
+            setChildMetadata(childNodeId, childVarOrder, queryId);
+
+            // Walk to the next node.
+            super.meet(node);
+        }
+
+        /**
+         * Update a query node's metadata to include it's binding set variable 
order
+         * and it's parent node id. This information is only known when 
handling
+         * the parent node.
+         *
+         * @param childNodeId - The node ID of the child node.
+         * @param childVarOrder - The variable order of the child node's 
binding sets.
+         * @param parentNodeId - The node ID that consumes the child's binding 
sets.
+         */
+        private void setChildMetadata(final String childNodeId, final 
VariableOrder childVarOrder, final String parentNodeId) {
+            checkNotNull(childNodeId);
+            checkNotNull(childVarOrder);
+            checkNotNull(parentNodeId);
+
+            final NodeType childType = NodeType.fromNodeId(childNodeId).get();
+            switch(childType) {
+                case STATEMENT_PATTERN:
+                    StatementPatternMetadata.Builder spBuilder = 
fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+                    if(spBuilder == null) {
+                        spBuilder = 
StatementPatternMetadata.builder(childNodeId);
+                        fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+                    }
+
+                    spBuilder.setVarOrder(childVarOrder);
+                    spBuilder.setParentNodeId(parentNodeId);
+                    break;
+
+                case JOIN:
+                    JoinMetadata.Builder joinBuilder = 
fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+                    if(joinBuilder == null) {
+                        joinBuilder = JoinMetadata.builder(childNodeId);
+                        fluoQueryBuilder.addJoinMetadata(joinBuilder);
+                    }
+
+                    joinBuilder.setVariableOrder(childVarOrder);
+                    joinBuilder.setParentNodeId(parentNodeId);
+                    break;
+
+                case FILTER:
+                    FilterMetadata.Builder filterBuilder = 
fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+                    if(filterBuilder == null) {
+                        filterBuilder = FilterMetadata.builder(childNodeId);
+                        fluoQueryBuilder.addFilterMetadata(filterBuilder);
+                    }
+
+                    filterBuilder.setVarOrder(childVarOrder);
+                    filterBuilder.setParentNodeId(parentNodeId);
+                    break;
+
+            case QUERY:
+                throw new IllegalArgumentException("QUERY nodes do not have 
children.");
+            default:
+                throw new IllegalArgumentException("Unsupported NodeType: " + 
childType);
+            }
+        }
+
+        /**
+         * Get the non-constant variables from a {@link TupleExpr}.
+         *
+         * @param node - The node to inspect for variables. (not null)
+         * @return The non-constant variables that were part of the node.
+         */
+        private Set<String> getVars(final TupleExpr node) {
+            checkNotNull(node);
+
+            final Set<String> vars = Sets.newHashSet();
+
+            final Set<String> abn = node.getAssuredBindingNames();
+            for(final String s: abn) {
+                if(!s.startsWith("-const-")) {
+                    vars.add(s);
+                }
+            }
+
+            return vars;
+        }
+
+        /**
+         * Holds the Variable Order of the binding sets for the children of a 
join node.
+         */
+        @Immutable
+        @ParametersAreNonnullByDefault
+        private static final class JoinVarOrders {
+            private final String leftVarOrder;
+            private final String rightVarOrder;
+
+            /**
+             * Constructs an instance of {@link }.
+             *
+             * @param leftVarOrder - The left child's Variable Order. (not 
null)
+             * @param rightVarOrder - The right child's Variable Order. (not 
null)
+             */
+            public JoinVarOrders(final String leftVarOrder, final String 
rightVarOrder) {
+                this.leftVarOrder = checkNotNull(leftVarOrder);
+                this.rightVarOrder = checkNotNull(rightVarOrder);
+            }
+
+            /**
+             * @return The left child's Variable Order.
+             */
+            public String getLeftVarOrder() {
+                return leftVarOrder;
+            }
+
+            /**
+             * @return The right child's Variable Order.
+             */
+            public String getRightVarOrder() {
+                return rightVarOrder;
+            }
+        }
+
+        /**
+         * Shifts the common variables between the two children to the left so
+         * that Accumulo scans when performing the join are efficient.
+         *
+         * @param leftVars - The left child's variables. (not null)
+         * @param rightVars - The right child's variables. (not null)
+         * @return An object holding the left and right children's variable 
orders.
+         */
+        private JoinVarOrders getJoinArgVarOrders(final Set<String> leftVars, 
final Set<String> rightVars) {
+            checkNotNull(leftVars);
+            checkNotNull(rightVars);
+
+            // Find the common variables between the left and right children. 
The common vars
+            // are stored in a list to ensure iteration order is always the 
same.
+            final List<String> commonVars = new ArrayList<>( 
Sets.intersection(leftVars, rightVars) );
+
+            // Push all of the common variables to the left for each child's 
vars.
+            final List<String> leftVarOrder = leftShiftCommonVars(commonVars, 
leftVars);
+            final List<String> rightVarOrder = leftShiftCommonVars(commonVars, 
rightVars);
+            return new JoinVarOrders(toVarOrderString(leftVarOrder), 
toVarOrderString(rightVarOrder));
+        }
+
+        /**
+         * Orders the set of common variables so that all of the common ones
+         * are on the left in the same order they have been provided. The rest
+         * of the variables are iterated over and added to the end of the list
+         * in no particular order.
+         *
+         * @param commonVars - An ordered list of variables that must appear 
on the left. (not null)
+         * @param allVars - The variables that need to be ordered. (not null)
+         * @return A list of variables ordered as described above.
+         */
+        private List<String> leftShiftCommonVars(final List<String> 
commonVars, final Collection<String> allVars) {
+            checkNotNull(commonVars);
+            checkNotNull(allVars);
+
+            final List<String> shifted = Lists.newArrayList(commonVars);
+            for(final String var : allVars) {
+                if(!shifted.contains(var)) {
+                    shifted.add(var);
+                }
+            }
+            return shifted;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
new file mode 100644
index 0000000..b76bb29
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+
+import com.google.common.base.Objects;
+
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Metadata that is specific to Statement Pattern nodes.
+ */
+@Immutable
+@ParametersAreNonnullByDefault
+public class StatementPatternMetadata extends CommonNodeMetadata {
+
+    private final String statementPattern;
+    private final String parentNodeId;
+
+    /**
+     * Constructs an instance of {@link StatementPatternMetadata}.
+     *
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not 
null)
+     * @param varOrder - The variable order of binding sets that are emitted 
by this node. (not null)
+     * @param statementPattern - The statement pattern new statements are 
matched against. (not null)
+     * @param parentNodeId - The node id of this node's parent. (not null)
+     */
+    public StatementPatternMetadata(
+            final String nodeId,
+            final VariableOrder varOrder,
+            final String statementPattern,
+            final String parentNodeId) {
+        super(nodeId, varOrder);
+        this.statementPattern = checkNotNull(statementPattern);
+        this.parentNodeId = checkNotNull(parentNodeId);
+    }
+
+    /**
+     * @return The statement pattern new statements are matched against.
+     */
+    public String getStatementPattern() {
+        return statementPattern;
+    }
+
+    /**
+     * @return The node id of this node's parent.
+     */
+    public String getParentNodeId() {
+        return parentNodeId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(
+                super.getNodeId(),
+                super.getVariableOrder(),
+                statementPattern,
+                parentNodeId);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(this == o) {
+            return true;
+        }
+
+        if(o instanceof StatementPatternMetadata) {
+            if(super.equals(o)) {
+                final StatementPatternMetadata spMetadata = 
(StatementPatternMetadata)o;
+                return new EqualsBuilder()
+                        .append(statementPattern, spMetadata.statementPattern)
+                        .append(parentNodeId, spMetadata.parentNodeId)
+                        .isEquals();
+            }
+            return false;
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("Statement Pattern Metadata {\n")
+                .append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    Variable Order: " + super.getVariableOrder() + 
"\n")
+                .append("    Parent Node ID: " + parentNodeId + "\n")
+                .append("    Statement Pattern: " + statementPattern + "\n")
+                .append("}")
+                .toString();
+    }
+
+    /**
+     * Creates a new {@link Builder} for this class.
+     *
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not 
null)
+     * @return A new {@link Builder} for this class.
+     */
+    public static Builder builder(final String nodeId) {
+        return new Builder(nodeId);
+    }
+
+    /**
+     * Builds instances of {@link StatementPatternMetadata}.
+     */
+    @ParametersAreNonnullByDefault
+    public static final class Builder {
+
+        private final String nodeId;
+        private VariableOrder varOrder;
+        private String statementPattern;
+        private String parentNodeId;
+
+        /**
+         * Constructs an instance of {@link Builder}.
+         *
+         * @param nodeId - The ID the Fluo app uses to reference this node. 
(not null)
+         */
+        public Builder(final String nodeId) {
+            this.nodeId = checkNotNull(nodeId);
+        }
+
+        /**
+         * @return The ID the Fluo app uses to reference this node.
+         */
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        /**
+         * Sets the variable order of binding sets that are emitted by this 
node.
+         *
+         * @param varOrder - The variable order of binding sets that are 
emitted by this node.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
+            this.varOrder = varOrder;
+            return this;
+        }
+
+        /**
+         * Sets the statement pattern new statements are matched against.
+         *
+         * @param statementPattern - The statement pattern new statements are 
matched against.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setStatementPattern(@Nullable final String 
statementPattern) {
+            this.statementPattern = statementPattern;
+            return this;
+        }
+
+        /**
+         * Sets the node id of this node's parent.
+         *
+         * @param parentNodeId - The node id of this node's parent.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setParentNodeId(@Nullable final String parentNodeId) {
+            this.parentNodeId = parentNodeId;
+            return this;
+        }
+
+        /**
+         * @return Constructs an instance of {@link StatementPatternMetadata} 
using the values that are in this builder.
+         */
+        public StatementPatternMetadata build() {
+            return new StatementPatternMetadata(
+                    nodeId,
+                    varOrder,
+                    statementPattern,
+                    parentNodeId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
new file mode 100644
index 0000000..8b38923
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.Compare.CompareOp;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.base.Optional;
+
+/**
+ * Tests the methods of {@link FilterFinder}.
+ */
+public class FilterFinderTest {
+
+    @Test
+    public void manyFilters() throws Exception {
+        // The query that will be searched.
+        final String sparql =
+                "SELECT ?person ?age " +
+                "{" +
+                  "FILTER(?age < 30) . " +
+                  "FILTER(?person = <http://Alice>)" +
+                  "?person <http://hasAge> ?age" +
+                "}";
+
+        // Create the expected result.
+        final ValueExpr[] expected = new ValueExpr[2];
+        expected[0] =  new Compare(new Var("person"), new ValueConstant( new 
URIImpl("http://Alice";) ));
+        expected[1] = new Compare(new Var("age"), new ValueConstant( new 
LiteralImpl("30", XMLSchema.INTEGER) ), CompareOp.LT);
+
+        // Run the test.
+        final FilterFinder finder = new FilterFinder();
+        final ValueExpr[] conditions = new ValueExpr[2];
+        conditions[0] = finder.findFilter(sparql, 0).get().getCondition();
+        conditions[1] = finder.findFilter(sparql, 1).get().getCondition();
+        assertTrue( Arrays.equals(expected, conditions) );
+    }
+
+    @Test
+    public void noFilterAtIndex() throws Exception {
+        // The query that will be searched.
+        final String sparql =
+                "SELECT ?person ?age " +
+                "{" +
+                  "FILTER(?age < 30) . " +
+                  "FILTER(?person = <http://Alice>)" +
+                  "?person <http://hasAge> ?age" +
+                "}";
+
+        // Run the test.
+        final FilterFinder finder = new FilterFinder();
+        final Optional<Filter> filter = finder.findFilter(sparql, 4);
+        assertFalse( filter.isPresent() );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
new file mode 100644
index 0000000..a672bd2
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.beust.jcommander.internal.Lists;
+
+/**
+ * Tests the methods of {@link FluoStringConverterTest}.
+ */
+public class FluoStringConverterTest {
+
+    @Test
+    public void varOrderToString() {
+        // Setup the variable order that will be converted.
+        final Collection<String> varOrder = Lists.newArrayList("x", "y", "z");
+
+        // Convert it to a String.
+        final String varOrderString = 
FluoStringConverter.toVarOrderString(varOrder);
+
+        // Ensure it converted to the expected result.
+        final String expected = "x;y;z";
+        assertEquals(expected, varOrderString);
+    }
+
+    @Test
+    public void stringToVarOrder() {
+        // Setup the String that will be converted.
+        final String varOrderString = "x;y;z";
+
+        // Convert it to an array in variable order.
+        final String[] varOrder = 
FluoStringConverter.toVarOrder(varOrderString);
+
+        // Ensure it converted to the expected result.
+        final String[] expected = {"x", "y", "z"};
+        assertTrue( Arrays.equals(expected, varOrder) );
+    }
+
+       @Test
+       public void bindingSetToString() {
+               // Setup the binding set that will be converted.
+               final MapBindingSet originalBindingSet = new MapBindingSet();
+        originalBindingSet.addBinding("x", new URIImpl("http://a";));
+        originalBindingSet.addBinding("y", new URIImpl("http://b";));
+        originalBindingSet.addBinding("z", new URIImpl("http://c";));
+
+        // Convert it to a String.
+        final String[] varOrder = new String[] {"y", "z", "x" };
+        final String bindingSetString = 
FluoStringConverter.toBindingSetString(originalBindingSet, varOrder);
+
+        // Ensure it converted to the expected result.
+        final String expected = 
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
+
+        assertEquals(expected, bindingSetString);
+       }
+
+       @Test
+       public void stringToBindingSet() {
+           // Setup the String that will be converted.
+           final String bindingSetString = 
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
+
+           // Convert it to a BindingSet
+           final String[] varOrder = new String[] {"y", "z", "x" };
+        final BindingSet bindingSet = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+
+        // Ensure it converted to the expected result.
+        final MapBindingSet expected = new MapBindingSet();
+        expected.addBinding("x", new URIImpl("http://a";));
+        expected.addBinding("y", new URIImpl("http://b";));
+        expected.addBinding("z", new URIImpl("http://c";));
+
+        assertEquals(expected, bindingSet);
+       }
+
+       @Test
+       public void statementPatternToString() throws MalformedQueryException {
+        // Setup a StatementPattern that represents "?x <http://worksAt> 
<http://Chipotle>."
+        final Var subject = new Var("x");
+        final Var predicate = new Var("-const-http://worksAt";, new 
URIImpl("http://worksAt";));
+        predicate.setAnonymous(true);
+        predicate.setConstant(true);
+        final Var object = new Var("-const-http://Chipotle";, new 
URIImpl("http://Chipotle";));
+        object.setAnonymous(true);
+        object.setConstant(true);
+        final StatementPattern pattern = new StatementPattern(subject, 
predicate, object);
+
+        // Convert the pattern to a String.
+        final String spString = 
FluoStringConverter.toStatementPatternString(pattern);
+
+        // Ensure it converted to the expected result.
+        final String expected = "x:::" +
+                
"-const-http://worksAt<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                
"-const-http://Chipotle<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
+
+        assertEquals(spString, expected);
+       }
+
+    @Test
+    public void stringToStatementPattern() {
+        // Setup the String representation of a statement pattern.
+        final String patternString = "x:::" +
+                
"-const-http://worksAt<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
+                
"-const-http://Chipotle<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
+
+        // Convert it to a StatementPattern.
+        final StatementPattern statementPattern = 
FluoStringConverter.toStatementPattern(patternString);
+
+        // Enusre it converted to the expected result.
+        final Var subject = new Var("x");
+        final Var predicate = new Var("-const-http://worksAt";, new 
URIImpl("http://worksAt";));
+        predicate.setAnonymous(true);
+        predicate.setConstant(true);
+        final Var object = new Var("-const-http://Chipotle";, new 
URIImpl("http://Chipotle";));
+        object.setAnonymous(true);
+        object.setConstant(true);
+        final StatementPattern expected = new StatementPattern(subject, 
predicate, object);
+
+        assertEquals(expected, statementPattern);
+    }
+
+    @Test
+    public void toVar_uri() {
+        // Setup the string representation of the variable.
+        final String varString = 
"-const-http://Chipotle<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
+
+        // Convert it to a Var object.
+        final Var var = FluoStringConverter.toVar(varString);
+
+        // Ensure it converted to the expected result.
+        final Var expected = new Var("-const-http://Chipotle";, new 
URIImpl("http://Chipotle";));
+        expected.setAnonymous(true);
+        expected.setConstant(true);
+
+        assertEquals(expected, var);
+    }
+
+    @Test
+    public void toVar_int() throws MalformedQueryException {
+        // Setup the string representation of the variable.
+        final String varString = 
"-const-5<<~>>http://www.w3.org/2001/XMLSchema#integer";;
+
+        // Convert it to a Var object.
+        final Var result = FluoStringConverter.toVar(varString);
+
+        // Ensure it converted to the expected result.
+        final Var expected = new Var("-const-5", new LiteralImpl("5", 
XMLSchema.INTEGER));
+        expected.setAnonymous(true);
+        expected.setConstant(true);
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void toVar_string() {
+        // Setup the string representation of the variable.
+        final String varString = 
"-const-Chipotle<<~>>http://www.w3.org/2001/XMLSchema#string";;
+
+        // Convert it to a Var object.
+        final Var result = FluoStringConverter.toVar(varString);
+
+        // Ensure it converted to the expected result.
+        final Var expected = new Var("-const-Chipotle", new 
LiteralImpl("Chipotle", XMLSchema.STRING));
+        expected.setAnonymous(true);
+        expected.setConstant(true);
+
+        assertEquals(expected, result);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NodeTypeTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NodeTypeTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NodeTypeTest.java
new file mode 100644
index 0000000..f86426c
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NodeTypeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+
+/**
+ * Tests the methods of {@link NodeType}.
+ */
+public class NodeTypeTest {
+
+    @Test
+    public void fromNodeId_StatementPattern() {
+        final Optional<NodeType> type = 
NodeType.fromNodeId(IncrementalUpdateConstants.SP_PREFIX + "_" + 
UUID.randomUUID());
+        assertEquals(NodeType.STATEMENT_PATTERN, type.get());
+    }
+
+    @Test
+    public void fromNodeId_Join() {
+        final Optional<NodeType> type = 
NodeType.fromNodeId(IncrementalUpdateConstants.JOIN_PREFIX + "_" + 
UUID.randomUUID());
+        assertEquals(NodeType.JOIN, type.get());
+    }
+
+    @Test
+    public void fromNodeId_Filter() {
+        final Optional<NodeType> type = 
NodeType.fromNodeId(IncrementalUpdateConstants.FILTER_PREFIX + "_" + 
UUID.randomUUID());
+        assertEquals(NodeType.FILTER, type.get());
+    }
+
+    @Test
+    public void fromNodeId_Query() {
+        final Optional<NodeType> type = 
NodeType.fromNodeId(IncrementalUpdateConstants.QUERY_PREFIX + "_" + 
UUID.randomUUID());
+        assertEquals(NodeType.QUERY, type.get());
+    }
+
+    @Test
+    public void fromNodeId_invalid() {
+        final Optional<NodeType> type = NodeType.fromNodeId("Invalid ID 
String.");
+        assertFalse( type.isPresent() );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
new file mode 100644
index 0000000..9ac5139
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * Tests the methods of {@link RyaExportParameters}.
+ */
+public class RyaExportParametersTest {
+
+    @Test
+    public void writeParams() {
+        final Map<String, String> params = new HashMap<>();
+
+        // Load some values into the params using the wrapper.
+        final RyaExportParameters ryaParams = new RyaExportParameters(params);
+        ryaParams.setExportToRya(true);
+        ryaParams.setAccumuloInstanceName("demoAccumulo");
+        ryaParams.setZookeeperServers("zoo1;zoo2");
+        ryaParams.setExporterUsername("fluo");
+        ryaParams.setExporterPassword("3xp0rt3r");
+
+        // Ensure the params map has the expected values.
+        final Map<String, String> expectedParams = new HashMap<>();
+        expectedParams.put(RyaExportParameters.CONF_EXPORT_TO_RYA, "true");
+        expectedParams.put(RyaExportParameters.CONF_ACCUMULO_INSTANCE_NAME, 
"demoAccumulo");
+        expectedParams.put(RyaExportParameters.CONF_ZOOKEEPER_SERVERS, 
"zoo1;zoo2");
+        expectedParams.put(RyaExportParameters.CONF_EXPORTER_USERNAME, "fluo");
+        expectedParams.put(RyaExportParameters.CONF_EXPORTER_PASSWORD, 
"3xp0rt3r");
+
+        assertEquals(expectedParams, params);
+    }
+
+    @Test
+    public void notConfigured() {
+        final Map<String, String> params = new HashMap<>();
+
+        // Ensure an unconfigured parameters map will say rya export is 
disabled.
+        final RyaExportParameters ryaParams = new RyaExportParameters(params);
+        assertFalse(ryaParams.isExportToRya());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/conf/log4j2.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/conf/log4j2.xml 
b/extras/rya.pcj.fluo/pcj.fluo.client/conf/log4j2.xml
new file mode 100644
index 0000000..3c465b6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/conf/log4j2.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Logger name="org.apache.rya.indexing.pcj.fluo.command.ListQueriesCommand" 
level="trace" />
+    <Logger name="org.apache.rya.indexing.pcj.fluo.command.NewQueryCommand" 
level="trace" />
+    <Logger name="org.apache.rya.indexing.pcj.fluo.command.LoadTriplesCommand" 
level="trace"/>
+    <Logger name="org.apache.rya.indexing.pcj.fluo.util.FluoLoader" 
level="trace"/>
+
+    <Root level="warn">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/conf/tool.properties
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/conf/tool.properties 
b/extras/rya.pcj.fluo/pcj.fluo.client/conf/tool.properties
new file mode 100644
index 0000000..98a7d3d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/conf/tool.properties
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#---------------------------------------------------------------------------------
+# This client assumes both Rya and Fluo are running on the same Accumulo 
instance
+# managed by the same Zookeeper servers. It uses the same credentials for all 
+# connections it makes to Accumulo and the apps residing on top of it.
+#---------------------------------------------------------------------------------
+
+# --------------------
+# Accumulo Properties.
+# --------------------
+rya.pcj.admin.client.accumulo.zooServers=
+rya.pcj.admin.client.accumulo.instanceName=
+rya.pcj.admin.client.accumulo.username=
+rya.pcj.admin.client.accumulo.password=
+
+# ----------------
+# Fluo Properties.
+# ----------------
+# The name of the app Fluo is running. This name should match the name of
+# the Accumulo table the app stores its state in.
+rya.pcj.admin.client.fluo.appName=
+
+# ----------------
+# Rya Properties.
+# ----------------
+# A prefix to the names of the RYA instance's tables in Accumulo. This prefix
+# determine which RYA tables will be searched when making historic matches as
+# we as when creating new PCJ export tables.
+rya.pcj.admin.client.rya.tablePrefix=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml
new file mode 100644
index 0000000..8c2f5b3
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project 
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.pcj.fluo.parent</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.pcj.fluo.client</artifactId>
+    
+    <name>Apache Rya PCJ Fluo Client</name>
+    <description>A command line tool that lets a user maintain the state of 
the Rya PCJ Fluo app.</description>
+    
+    <dependencies>
+        <!-- Rya Runtime Dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.api</artifactId>
+        </dependency>
+        
+        <!-- 3rd Party Runtime Dependencies. -->
+        <dependency>
+            <groupId>com.beust</groupId>
+            <artifactId>jcommander</artifactId>
+        </dependency>
+        
+        <dependency> 
+            <groupId>io.fluo</groupId>
+            <artifactId>fluo-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryrender</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-ntriples</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-trig</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-turtle</artifactId>
+            <version>${openrdf.sesame.version}</version>
+        </dependency>
+        
+        <!-- Log4j 2 bridge, api, and core. -->        
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>2.5</version>
+        </dependency> 
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Use the pre-build 'jar-with-dependencies' assembly to package 
the dependent class files into the final jar. 
+                 This creates a jar file that can be deployed to Fluo without 
having to include any dependent jars. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathLayoutType>custom</classpathLayoutType>
+                            
<customClasspathLayout>WEB-INF/lib/$${artifact.groupIdPath}/$${artifact.artifactId}-$${artifact.version}$${dashClassifier?}.$${artifact.extension}</customClasspathLayout>
+                        
+                            
<mainClass>org.apache.rya.indexing.pcj.fluo.PcjAdminClient</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
new file mode 100644
index 0000000..59177f6
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import 
org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ArgumentsException;
+import 
org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ExecutionException;
+import 
org.apache.rya.indexing.pcj.fluo.client.command.CountUnprocessedStatementsCommand;
+import org.apache.rya.indexing.pcj.fluo.client.command.ListQueriesCommand;
+import org.apache.rya.indexing.pcj.fluo.client.command.LoadTriplesCommand;
+import org.apache.rya.indexing.pcj.fluo.client.command.NewQueryCommand;
+import org.apache.rya.indexing.pcj.fluo.client.command.QueryReportCommand;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.config.FluoConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * An application that helps Rya PCJ administrators interact with the cluster.
+ */
+@ParametersAreNonnullByDefault
+public class PcjAdminClient {
+
+    private static final Logger log = 
LogManager.getLogger(PcjAdminClient.class);
+
+    private static final Path PROPERTIES_FILE = 
Paths.get("conf/tool.properties");
+
+    /**
+     * Maps from command strings to the object that performs the command.
+     */
+    private static final ImmutableMap<String, PcjAdminClientCommand> commands;
+    static {
+        final Set<Class<? extends PcjAdminClientCommand>> commandClasses = new 
HashSet<>();
+        commandClasses.add(NewQueryCommand.class);
+        commandClasses.add(LoadTriplesCommand.class);
+        commandClasses.add(ListQueriesCommand.class);
+        commandClasses.add(QueryReportCommand.class);
+        commandClasses.add(CountUnprocessedStatementsCommand.class);
+
+        final ImmutableMap.Builder<String, PcjAdminClientCommand> builder = 
ImmutableMap.builder();
+        for(final Class<? extends PcjAdminClientCommand> commandClass : 
commandClasses) {
+            try {
+                final PcjAdminClientCommand command = 
commandClass.newInstance();
+                builder.put(command.getCommand(), command);
+            } catch (InstantiationException | IllegalAccessException e) {
+                System.err.println("Could not run the application because a 
PcjCommand is missing its empty constructor.");
+                e.printStackTrace();
+            }
+        }
+        commands = builder.build();
+    }
+
+    /**
+     * Describes how this application may be used on the command line.
+     */
+    private static final String usage = makeUsage(commands);
+
+    public static void main(final String[] args) {
+        log.trace("Starting up the PCJ Admin Client.");
+
+        // If no command provided or the command isn't recognized, then print 
the usage.
+        if(args.length == 0 || !commands.containsKey(args[0])) {
+            System.out.println(usage);
+            System.exit(-1);
+        }
+
+        // Load the properties file.
+        final Properties props = new Properties();
+        try (InputStream pin = Files.newInputStream(PROPERTIES_FILE)) {
+            props.load( pin );
+        } catch (final IOException e) {
+            throw new RuntimeException("Could not load properties file: " + 
PROPERTIES_FILE, e);
+        }
+
+        // Fetch the command that will be executed.
+        final String command = args[0];
+        final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+        final PcjAdminClientCommand pcjCommand = commands.get(command);
+
+        RyaSailRepository rya = null;
+        FluoClient fluo = null;
+        try {
+            // Connect to Accumulo, Rya, and Fluo.
+            final PcjAdminClientProperties clientProps = new 
PcjAdminClientProperties(props);
+            final Connector accumulo = createAccumuloConnector(clientProps);
+            rya = makeRyaRepository(clientProps, accumulo);
+            fluo = createFluoClient( clientProps );
+
+            // Execute the command.
+            pcjCommand.execute(accumulo, clientProps.getRyaTablePrefix(), rya, 
fluo, commandArgs);
+
+        } catch (final AccumuloException | AccumuloSecurityException e) {
+            System.err.println("Could not connect to the Accumulo instance 
that hosts the export PCJ tables.");
+            e.printStackTrace();
+            System.exit(-1);
+        } catch(final RepositoryException e) {
+            System.err.println("Could not connect to the Rya instance that 
hosts the historic RDF statements.");
+            e.printStackTrace();
+            System.exit(-1);
+        } catch (final ArgumentsException e) {
+            System.err.println( pcjCommand.getUsage() );
+            System.exit(-1);
+        } catch (final ExecutionException e) {
+            System.err.println("Could not execute the command.");
+            e.printStackTrace();
+            System.exit(-1);
+        } finally {
+            log.trace("Shutting down the PCJ Admin Client.");
+
+            if(rya != null) {
+                try {
+                    rya.shutDown();
+                } catch (final RepositoryException e) {
+                    System.err.println("Problem while shutting down the Rya 
connection.");
+                    e.printStackTrace();
+                }
+            }
+
+            if(fluo != null) {
+                fluo.close();
+            }
+        }
+    }
+
+    private static String makeUsage(final ImmutableMap<String, 
PcjAdminClientCommand> commands) {
+        final StringBuilder usage = new StringBuilder();
+        usage.append("Usage: 
").append(PcjAdminClient.class.getSimpleName()).append(" <command> (<argument> 
... )\n");
+        usage.append("\n");
+        usage.append("Possible Commands:\n");
+
+        // Sort and find the max width of the commands.
+        final List<String> sortedCommandNames = Lists.newArrayList( 
commands.keySet() );
+        Collections.sort(sortedCommandNames);
+
+        int maxCommandLength = 0;
+        for(final String commandName : sortedCommandNames) {
+            maxCommandLength = commandName.length() > maxCommandLength ? 
commandName.length() : maxCommandLength;
+        }
+
+        // Add each command to the usage.
+        final String commandFormat = "    %-" + (maxCommandLength) + "s - 
%s\n";
+        for(final String commandName : sortedCommandNames) {
+            final String commandDescription = 
commands.get(commandName).getDescription();
+            usage.append( String.format(commandFormat, commandName, 
commandDescription) );
+        }
+
+        return usage.toString();
+    }
+
+    private static Connector createAccumuloConnector(final 
PcjAdminClientProperties clientProps) throws AccumuloException, 
AccumuloSecurityException {
+        checkNotNull(clientProps);
+
+        // Connect to the Zookeepers.
+        final String instanceName = clientProps.getAccumuloInstance();
+        final String zooServers = clientProps.getAccumuloZookeepers();
+        final Instance inst = new ZooKeeperInstance(instanceName, zooServers);
+
+        // Create a connector to the Accumulo that hosts the PCJ export tables.
+        return inst.getConnector(clientProps.getAccumuloUsername(), new 
PasswordToken(clientProps.getAccumuloPassword()));
+    }
+
+    private static RyaSailRepository makeRyaRepository(final 
PcjAdminClientProperties clientProps, final Connector accumulo) throws 
RepositoryException {
+        checkNotNull(clientProps);
+        checkNotNull(accumulo);
+
+        // Setup Rya configuration values.
+        final AccumuloRdfConfiguration ryaConf = new 
AccumuloRdfConfiguration();
+        ryaConf.setTablePrefix( clientProps.getRyaTablePrefix() );
+
+        // Connect to the Rya repo.
+        final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO();
+        accumuloRyaDao.setConnector(accumulo);
+        accumuloRyaDao.setConf(ryaConf);
+
+        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+        ryaStore.setRyaDAO(accumuloRyaDao);
+
+        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+        ryaRepo.initialize();
+        return ryaRepo;
+    }
+
+    private static FluoClient createFluoClient(final PcjAdminClientProperties 
clientProps) {
+        checkNotNull(clientProps);
+        final FluoConfiguration fluoConfig = new FluoConfiguration();
+
+        // Fluo configuration values.
+        fluoConfig.setApplicationName( clientProps.getFluoAppName() );
+        fluoConfig.setInstanceZookeepers( clientProps.getAccumuloZookeepers() 
+  "/fluo" );
+
+        // Accumulo Connection Stuff.
+        fluoConfig.setAccumuloZookeepers( clientProps.getAccumuloZookeepers() 
);
+        fluoConfig.setAccumuloInstance( clientProps.getAccumuloInstance() );
+        fluoConfig.setAccumuloUser( clientProps.getAccumuloUsername() );
+        fluoConfig.setAccumuloPassword( clientProps.getAccumuloPassword() );
+
+        // Connect the client.
+        return FluoFactory.newClient(fluoConfig);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
new file mode 100644
index 0000000..e1185d4
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+
+import io.fluo.api.client.FluoClient;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * A command that may be executed by the {@link PcjAdminClient}.
+ */
+@ParametersAreNonnullByDefault
+public interface PcjAdminClientCommand {
+
+    /**
+     * @return What a user would type into the command line to indicate
+     *   they want to execute this command.
+     */
+    public String getCommand();
+
+    /**
+     * @return Briefly describes what the command does.
+     */
+    public String getDescription();
+
+    /**
+     * @return Describes what arguments may be provided to the command.
+     */
+    public String getUsage();
+
+    /**
+     * Execute the command using the command line arguments.
+     *
+     * @param accumulo - A connection to the Accumulo cluster PCJs are 
exported to. (not null)
+     * @param ryaTablePrefix - The prefix applied to the Accumulo tables 
associated with the
+     *   instance of Rya the command interacts with. (not null)
+     * @param rya - A connection to the Rya instance used to search for 
historic PCJ matches. (not null)
+     * @param client - A connection to the Fluo app that is updating the PCJs. 
(not null)
+     * @param args - Command line arguments that configure how the command 
will execute. (not null)
+     */
+    public void execute(
+            final Connector accumulo,
+            final String ryaTablePrefix,
+            final RyaSailRepository rya,
+            final FluoClient fluo,
+            final String[] args) throws ArgumentsException, ExecutionException;
+
+    /**
+     * A {@link PcjAdminClientCommand} could not be executed because of a 
problem with
+     * the arguments that were provided to it.
+     */
+    public static final class ArgumentsException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public ArgumentsException(final String message) {
+            super(message);
+        }
+
+        public ArgumentsException(final String message, final Throwable cause) 
{
+            super(message, cause);
+        }
+    }
+
+    /**
+     * A {@link PcjAdminClientCommand} could not be executed.
+     */
+    public static final class ExecutionException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public ExecutionException(final String message) {
+            super(message);
+        }
+
+        public ExecutionException(final String message, final Throwable cause) 
{
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientProperties.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientProperties.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientProperties.java
new file mode 100644
index 0000000..794881f
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientProperties.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * Interprets a {@link Properties} object so that it is easier to access
+ * configuration values used by {@link PcjAdminClient}.
+ */
+@ParametersAreNonnullByDefault
+public class PcjAdminClientProperties {
+
+    // Properties that configure how Fluo will connect to Accumulo.
+    public static final String ACCUMULO_ZOOKEEPERS = 
"rya.pcj.admin.client.accumulo.zooServers";
+    public static final String ACCUMULO_INSTANCE = 
"rya.pcj.admin.client.accumulo.instanceName";
+    public static final String ACCUMULO_USERNAME = 
"rya.pcj.admin.client.accumulo.username";
+    public static final String ACCUMULO_PASSWORD = 
"rya.pcj.admin.client.accumulo.password";
+
+    // Properties that configure how the client will interact with the Fluo 
app.
+    public static final String FLUO_APP_NAME = 
"rya.pcj.admin.client.fluo.appName";
+
+    // Properties taht configure how the client will interact with Rya.
+    public static final String RYA_TABLE_PREFIX = 
"rya.pcj.admin.client.rya.tablePrefix";
+
+    private final Properties props;
+
+    /**
+     * Constructs an instance of {@link PcjAdminClientProperties}.
+     *
+     * @param props - The properties this class will interpret. (not null)
+     */
+    public PcjAdminClientProperties(final Properties props) {
+        this.props = checkNotNull(props);
+    }
+
+    /**
+     * @return A comma delimited list of the Zookeeper servers that manage the
+     *   Accumulo instance.
+     */
+    public @Nullable String getAccumuloZookeepers() {
+        return props.getProperty(ACCUMULO_ZOOKEEPERS);
+    }
+
+    /**
+     * @return The name of the Accumulo instance that is used by Rya and Fluo.
+     */
+    public @Nullable String getAccumuloInstance() {
+        return props.getProperty(ACCUMULO_INSTANCE);
+    }
+
+    /**
+     * @return The username the application will used to interact with 
Accumulo.
+     */
+    public @Nullable String getAccumuloUsername() {
+        return props.getProperty(ACCUMULO_USERNAME);
+    }
+
+    /**
+     * @return The password the application will use to interact with Accumulo.
+     */
+    public @Nullable String getAccumuloPassword() {
+        return props.getProperty(ACCUMULO_PASSWORD);
+    }
+
+    /**
+     * @return The name of the Fluo app that is incrementally maintaining PCJ 
results.
+     */
+    public @Nullable String getFluoAppName() {
+        return props.getProperty(FLUO_APP_NAME);
+    }
+
+    /**
+     * @return The prefix that is applied to the PCJ export table names and the
+     *   Rya RDF tables for the Rya instance the Fluo app is exporting to.
+     */
+    public @Nullable String getRyaTablePrefix() {
+        return props.getProperty(RYA_TABLE_PREFIX);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/CountUnprocessedStatementsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/CountUnprocessedStatementsCommand.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/CountUnprocessedStatementsCommand.java
new file mode 100644
index 0000000..7fefb28
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/CountUnprocessedStatementsCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.client.command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigInteger;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.api.CountStatements;
+import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
+
+import io.fluo.api.client.FluoClient;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * A command that prints the number of RDF Statements that are loaded into the
+ * Fluo application and have not been processed yet.
+ */
+public class CountUnprocessedStatementsCommand implements 
PcjAdminClientCommand {
+
+    private static final Logger log = 
Logger.getLogger(CountUnprocessedStatementsCommand.class);
+
+    @Override
+    public String getCommand() {
+        return "count-unprocessed-statements";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Lists the number of Statements that have been inserted into 
the Fluo application that have not been processed yet.";
+    }
+
+    @Override
+    public String getUsage() {
+        return "There are no parameters associated with this command.";
+    }
+
+    @Override
+    public void execute(final Connector accumulo, final String ryaTablePrefix, 
final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws 
ArgumentsException, ExecutionException {
+        checkNotNull(accumulo);
+        checkNotNull(ryaTablePrefix);
+        checkNotNull(rya);
+        checkNotNull(fluo);
+        checkNotNull(args);
+
+        log.trace("Executing the Count Unprocessed Triples Command...");
+
+        final BigInteger count = new CountStatements().countStatements(fluo);
+        System.out.println("There are " + count.toString() + " unprocessed 
Statements stored in the Fluo app." );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java
new file mode 100644
index 0000000..1764853
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.client.command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata;
+import 
org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
+import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
+import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
+import org.apache.rya.indexing.pcj.fluo.client.util.PcjMetadataRenderer;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import io.fluo.api.client.FluoClient;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+
+/**
+ * A command that lists information about the queries that are being managed 
by the Fluo app.
+ */
+@ParametersAreNonnullByDefault
+public class ListQueriesCommand implements PcjAdminClientCommand {
+    private static final Logger log = 
LogManager.getLogger(ListQueriesCommand.class);
+
+    /**
+     * Command line parameters that are used by this command to configure 
itself.
+     */
+    private static final class Parameters {
+        @Parameter(names = "--queryId", required = false, description = "Make 
this command only fetch the metadata for the specififed Query ID.")
+        private String queryId;
+    }
+
+    @Override
+    public String getCommand() {
+        return "list-queries";
+    }
+
+    @Override
+    public String getDescription() {
+        return "View metadata about the queries that are loaded in the Fluo 
app";
+    }
+
+    @Override
+    public String getUsage() {
+        final JCommander parser = new JCommander(new Parameters());
+
+        final StringBuilder usage = new StringBuilder();
+        parser.usage(usage);
+        return usage.toString();
+    }
+
+    @Override
+    public void execute(final Connector accumulo, final String ryaTablePrefix, 
final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws 
ArgumentsException, ExecutionException {
+        checkNotNull(accumulo);
+        checkNotNull(fluo);
+        checkNotNull(args);
+
+        log.trace("Executing the List Queries Command...");
+
+        // Parse the command line arguments.
+        final Parameters params = new Parameters();
+        try {
+            new JCommander(params, args);
+        } catch(final ParameterException e) {
+            throw new ArgumentsException("Could not list the queries because 
of invalid command line parameters.", e);
+        }
+
+        // Fetch the PCJ metadata that will be included in the report.
+        final GetPcjMetadata getPcjMetadata = new GetPcjMetadata();
+        final Map<String, PcjMetadata> metadata = new HashMap<String, 
PcjMetadata>();
+        try {
+            if(params.queryId != null) {
+                log.trace("Fetch the PCJ Metadata from Accumulo for Query ID 
'" + params.queryId + "'.");
+                metadata.put(params.queryId, 
getPcjMetadata.getMetadata(accumulo, fluo, params.queryId));
+            } else {
+                log.trace("Fetch the PCJ Metadata from Accumulo for all 
queries that are being updated by Fluo.");
+                metadata.putAll( getPcjMetadata.getMetadata(accumulo, fluo) );
+            }
+        } catch (NotInFluoException | NotInAccumuloException e) {
+            throw new ExecutionException("Could not fetch some of the metadata 
required to build the report.", e);
+        }
+
+        // Write the metadata to the console.
+        log.trace("Rendering the queries report...");
+        if(metadata.isEmpty()) {
+            System.out.println("No queries are being tracked by Fluo.");
+        } else {
+            final PcjMetadataRenderer renderer = new PcjMetadataRenderer();
+            try {
+                final String report = renderer.render(metadata);
+                System.out.println("The number of Queries that are being 
tracked by Fluo: " + metadata.size());
+                System.out.println(report);
+            } catch (final Exception e) {
+                throw new ExecutionException("Unable to render the query 
metadata report for output.", e);
+            }
+        }
+
+        log.trace("Finished executing the List Queries Command.");
+    }
+}
\ No newline at end of file


Reply via email to