RYA-406. Closes #251.

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/62de7c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/62de7c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/62de7c5d

Branch: refs/heads/master
Commit: 62de7c5d16ac6624228fd2c03f78b09845c1e74b
Parents: 6d2bfcb
Author: Caleb Meier <[email protected]>
Authored: Mon Nov 6 12:23:57 2017 -0800
Committer: Caleb Meier <[email protected]>
Committed: Tue Dec 5 11:47:45 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/RyaClientExample.java         |   3 +-
 .../notification/pruner/FluoBinPruner.java      |   9 +-
 .../rya.manual/src/site/markdown/pcj-updater.md | 186 +++++++++++++++++++
 .../indexing/pcj/fluo/api/DeleteFluoPcj.java    |  59 +++---
 .../indexing/pcj/fluo/api/GetQueryReport.java   |  21 ++-
 .../indexing/pcj/fluo/api/InsertTriples.java    |  26 ++-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |  40 ++--
 .../pcj/fluo/app/AbstractNodeUpdater.java       |  45 +++++
 .../pcj/fluo/app/AggregationResultUpdater.java  |   5 +-
 .../indexing/pcj/fluo/app/BindingSetRow.java    |  41 +++-
 .../fluo/app/ConstructQueryResultUpdater.java   |  13 +-
 .../pcj/fluo/app/FilterResultUpdater.java       |   5 +-
 .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java |  11 +-
 .../fluo/app/IncrementalUpdateConstants.java    |  21 ++-
 .../pcj/fluo/app/JoinResultUpdater.java         |  62 +++----
 .../pcj/fluo/app/PeriodicQueryUpdater.java      |  21 +--
 .../pcj/fluo/app/ProjectionResultUpdater.java   |  10 +-
 .../pcj/fluo/app/QueryResultUpdater.java        |   5 +-
 .../batch/AbstractBatchBindingSetUpdater.java   |   6 +-
 .../app/batch/AbstractSpanBatchInformation.java |   8 +-
 .../app/batch/JoinBatchBindingSetUpdater.java   |  16 +-
 .../app/batch/SpanBatchBindingSetUpdater.java   |  60 ++++--
 .../app/batch/SpanBatchDeleteInformation.java   |  78 +++++++-
 .../SpanBatchInformationTypeAdapter.java        |  17 +-
 .../app/export/rya/RyaSubGraphExporter.java     |  13 +-
 .../fluo/app/observers/AggregationObserver.java |   9 +-
 .../observers/ConstructQueryResultObserver.java |  12 +-
 .../pcj/fluo/app/observers/FilterObserver.java  |   3 +-
 .../pcj/fluo/app/observers/JoinObserver.java    |   3 +-
 .../app/observers/PeriodicQueryObserver.java    |   5 +-
 .../fluo/app/observers/ProjectionObserver.java  |   3 +-
 .../fluo/app/observers/QueryResultObserver.java |   8 +-
 .../app/observers/StatementPatternObserver.java |   5 +-
 .../pcj/fluo/app/observers/TripleObserver.java  |  49 ++---
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  27 ++-
 .../fluo/app/query/FluoQueryMetadataCache.java  | 181 +++++++++++-------
 .../fluo/app/query/FluoQueryMetadataDAO.java    |   5 +
 .../fluo/app/query/MetadataCacheSupplier.java   |  70 +++++--
 .../fluo/app/query/StatementPatternIdCache.java |  89 +++++++++
 .../query/StatementPatternIdCacheSupplier.java  |  74 ++++++++
 .../app/query/StatementPatternIdManager.java    |  90 +++++++++
 .../app/util/BindingHashShardingFunction.java   | 178 ++++++++++++++++++
 .../pcj/fluo/app/util/TriplePrefixUtils.java    |  62 +++++++
 .../app/query/FluoQueryMetadataCacheTest.java   |  18 ++
 .../app/query/StatementPatternIdCacheTest.java  |  55 ++++++
 .../util/BindingHashShardingFunctionTest.java   |  66 +++++++
 .../fluo/app/util/TriplePrefixUtilsTest.java    |  37 ++++
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |  29 +--
 .../indexing/pcj/fluo/integration/BatchIT.java  | 129 +++++++------
 .../pcj/fluo/integration/CreateDeleteIT.java    |  20 +-
 .../integration/CreateDeletePeriodicPCJ.java    |   7 +-
 .../pcj/fluo/integration/FluoLatencyIT.java     | 169 -----------------
 .../indexing/pcj/fluo/integration/QueryIT.java  |  32 ++--
 .../integration/StatementPatternIdCacheIT.java  |  82 ++++++++
 .../src/test/resources/log4j.properties         |  43 +++++
 .../rya/pcj/fluo/test/base/FluoITBase.java      |  41 ++--
 .../pcj/fluo/test/base/KafkaExportITBase.java   |  13 +-
 .../pcj/functions/geo/GeoFunctionsIT.java       |  14 +-
 58 files changed, 1778 insertions(+), 631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/indexingExample/src/main/java/RyaClientExample.java
----------------------------------------------------------------------
diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java 
b/extras/indexingExample/src/main/java/RyaClientExample.java
index 3f42f1d..278f214 100644
--- a/extras/indexingExample/src/main/java/RyaClientExample.java
+++ b/extras/indexingExample/src/main/java/RyaClientExample.java
@@ -44,6 +44,7 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
@@ -75,7 +76,6 @@ public class RyaClientExample {
     private static final Logger log = Logger.getLogger(RyaClientExample.class);
 
     public static void main(final String[] args) throws Exception {
-        setupLogging();
 
         final String accumuloUsername = "root";
         final String accumuloPassword = "password";
@@ -241,6 +241,7 @@ public class RyaClientExample {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverSpecification> observers = new ArrayList<>();
         observers.add(new 
ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(BatchObserver.class.getName()));
         observers.add(new 
ObserverSpecification(StatementPatternObserver.class.getName()));
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
index 0562180..ea08af5 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
@@ -23,12 +23,14 @@ import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
 import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import com.google.common.base.Optional;
 public class FluoBinPruner implements BinPruner {
 
     private static final Logger log = 
LoggerFactory.getLogger(FluoBinPruner.class);
+    private static final ValueFactory vf = new ValueFactoryImpl();
     private final FluoClient client;
 
     public FluoBinPruner(final FluoClient client) {
@@ -67,9 +70,9 @@ public class FluoBinPruner implements BinPruner {
                 throw new RuntimeException();
             }
             final Column batchInfoColumn = type.get().getResultColumn();
-            final String batchInfoSpanPrefix = id + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
+            final Bytes batchInfoSpanPrefix = 
BindingHashShardingFunction.getShardedScanPrefix(id, vf.createLiteral(bin));
             final SpanBatchDeleteInformation batchInfo = 
SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
-                    
.setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
+                    .setSpan(Span.prefix(batchInfoSpanPrefix)).build();
             BatchInformationDAO.addBatch(tx, id, batchInfo);
             tx.commit();
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.manual/src/site/markdown/pcj-updater.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/pcj-updater.md 
b/extras/rya.manual/src/site/markdown/pcj-updater.md
index 11cb560..157bb13 100644
--- a/extras/rya.manual/src/site/markdown/pcj-updater.md
+++ b/extras/rya.manual/src/site/markdown/pcj-updater.md
@@ -508,6 +508,192 @@ will fail.  If that occurs, look up the YARN 
Application-Id in the YARN UI,
 or with the command `yarn application -list` and then kill it with a command
 similar to: `yarn application -kill application_1503402439867_0009`.
 
+## Performance Optimizations
+There are a number of optimizations that can boost the performance of the Rya 
PCJ Updater.  The main
+bottleneck that will prevent an instance of the PCJ Updater from scaling is 
the load that the application
+places on the Accumulo Tablet Servers.  In an effort to mitigate this, there 
are a number of things that can be done to 
+lighten the scan load on each Tablet Server and cut down on the number of 
scans overall.
+
+#### Metadata Caching
+The PCJ Updater uses metadata associated with each query node to route and 
process intermediate query results.  
+New queries can be dynamically added to and deleted from the Rya PCJ Updater, 
but for the most part this data
+is static and can be cached.  So the PCJ Updater aggressively caches whatever 
metadata it can to avoid lookups.  In addition,
+each time the Updater processes new statements, it must match the new triples 
with StatementPatterns registered with the 
+PCJ Updater.  The ids of these StatementPatterns are also cached to avoid 
costly scans for new StatementPatterns.  All 
+metadata caches are active and utilized by default.
+
+#### Load Balancing and Sharding
+Another important optimization that can drastically boost performance is 
sharding.  Sharding ensures that
+the processing and scanning load is equally distributed among all Tablet 
Servers.  By default, the PCJ Updater
+shards its rows.  However, to take advantage of this optimization, the user 
must pre-split the Fluo table after
+initializing the application.  To do this, add the following properties to the 
fluo.properties file for the application
+before initializing:
+
+```
+# RowHasher Split Properties
+# -------------------
+fluo.app.recipes.optimizations.SP=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.J=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.A=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.PR=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.Q=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+
+fluo.app.recipes.rowHasher.SP.numTablets=8
+fluo.app.recipes.rowHasher.J.numTablets=8
+fluo.app.recipes.rowHasher.A.numTablets=8
+fluo.app.recipes.rowHasher.PR.numTablets=8
+fluo.app.recipes.rowHasher.Q.numTablets=8
+
+```
+
+The above properties are used by the Fluo RowHasher recipe, which splits the 
Fluo
+table along each specified prefix.  The properties above indicate that the 
Fluo table
+will be split across 8 tablets along each of the prefixes SP, J, A, PR, and Q. 
 Each of these
+prefixes correspond to prefixes of distinct result ranges stored in the Rya 
PCJ Updater. 
+The choice of 8 tablets is arbitrary and should be based on available 
resources. After
+adding the above properties and initializing the application, execute the 
following command:
+
+```
+./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
+
+```
+
+This command generates and applies the splits indicated by above properties.  
See
+the Fluo [row hash prefix 
recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/row-hasher/) 
+for more details.
+
+#### Compaction Strategies for Garbage Collecting
+The PCJ Updater application retains processed notifications and triples that
+are marked for deletion until a minor or major compaction runs and triggers the
+Fluo Garbage Collection iterator.  Because new Triples are processed by the 
TripleObserver
+and then immediately marked as deleted, it's quite possible that a large 
number of "deleted" triples 
+could build up before they are actually removed from the table.  Similarly,
+old notifications that have already been processed and marked as deleted can 
pile up as well.  As these entries
+build up in the Fluo table, Tablet Servers have to scan over these entries 
when the
+Fluo NotificationIterator is run, creating extra work for the Tablet Servers.  
To count the number
+of "DELETED" notifications and triples that are in the table, run the 
following commands:
+
+``` 
+ #counts number of old notifications
+ ./bin/fluo scan <app_name> --raw -c ntfy | grep -c 'DELETE'
+ #counts number of deleted triples
+ ./bin/fluo scan <app_name> --raw -c triples | grep -c 'DELETE'
+```
+
+It's good practice to monitor how these quantities grow after starting the 
application to get 
+a sense of whether or not Accumulo compactions are being executed frequently 
enough.  One possible
+optimization to increase the compaction rate is to adjust the compaction ratio 
through the Accumulo shell.
+After initializing the PCJ Updater application, execute the following command 
in the Accumulo shell:
+
+```
+config -t <app_table_name> -s table.compaction.major.ratio=1.0 
+```
+
+This sets the major compaction ratio of the Fluo table to 1.0, where the lower 
the ratio, the more frequently
+major compactions will occur.  Another approach is to compact on a specified 
Range.  Fluo supports 
+periodically compacting on specified ranges.  To do this, add any of the 
following hex ranges to the
+fluo.properties file before initializing the PCJ Updater.
+
+```
+#Rya PCJ Updater Compaction ranges
+#Compact all triples
+fluo.app.recipes.transientRange.triples=543C3C3A3E3E:543C3C3A3E3EFF
+#Compact all statement pattern results
+fluo.app.recipes.transientRange.statementPattern=53503C3C3A3E3E:53503C3C3A3E3EFF
+#Compact all join results
+fluo.app.recipes.transientRange.join=4A3C3C3A3E3E:4A3C3C3A3E3EFF
+#Compact all aggregation results
+fluo.app.recipes.transientRange.aggregation=413C3C3A3E3E:413C3C3A3E3EFF
+#Compact all projection results
+fluo.app.recipes.transientRange.projection=50523C3C3A3E3E:50523C3C3A3E3EFF
+#Compact full table
+fluo.app.recipes.transientRange.fullTable=:FF
+```
+To apply the above transient range properties, execute the following Fluo 
command:
+
+```
+./bin/fluo exec <app_name> 
org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> 
<multiplier>
+```
+
+The above command executes a compaction over each transient range that is 
included in the fluo.properties file.
+These compactions will run indefinitely and execute every time the compaction 
period (in ms) elapses.  The multiplier
+is an optional parameter that indicates how much the periodic compaction 
script will throttle compactions if they
+begin taking too long.  See the Fluo [transient data 
recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/transient/) 
+for more information.
+
+If running compactions proves to be extremely costly and begins to affect 
application performance, it's best to 
+only do a range compaction on the triples.  This will clean up old, deleted 
triples and any notifications related to triples.
+If the Tablet Servers can handle the additional load, try adding additional 
ranges to clean up
+old data.  Note that when the full table range is added, there is no need to 
include any of the other
+ranges.  
+
+#### Cheatsheet for Applying the Optimizations
+To apply the above optimizations, 
+
+1. Stop the Rya PCJ Updater app if it is running by
+executing the following command
+```
+./bin/fluo stop <app_name>
+```
+ 2. Once the application is stopped, add any optimization related properties 
to the fluo.properties file.
+ These properties include the row sharding properties and the range compaction 
properties outlined
+ above.  Note that it is best to add all of the row shard properties to ensure 
an even data distribution
+ among all of the Tablet Servers, and start off by adding only the triple 
transient range property for Range compactions.  
+ 
+ 3. Initialize the Rya PCJ Updater application by executing the following 
command in Fluo
+ ```
+ ./bin/fluo init <app_name>
+ ```
+ 4. Apply the row shard splits by executing 
+ ```
+ ./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
+```
+5. Check the PCJ Updater table in the Accumulo UI to ensure that the table has 
the
+correct number of Tablet Servers
+
+6. Start the application by executing the following command
+```
+./bin/fluo start <app_name>
+```
+7.  Apply the transient range compactions by executing the following command
+```
+./bin/fluo exec <app_name> 
org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> 
<multiplier>
+```
+
+#### Monitoring Performance
+Once the application is running, there are a number ways to assess its 
performance.  The 
+primary method is to track the number of outstanding notifications that are 
queued and waiting
+to be processed.  This can be done by executing the Fluo wait command
+
+```
+./bin/fluo wait <app_name>
+```
+
+When this script executes, it polls Fluo every ten seconds to determine how 
many unprocessed notifications
+are queued up.  If this number grows over time then the application needs more 
workers to handle the ingest load.  
+
+Before deploying more workers, first verify that the Tablet Servers can handle 
an increased scan load by checking
+the Accumulo UI to see if the Tablet Servers have any queued scans for the PCJ 
Updater table.  If there are a large
+number of queued scans for the table, adding more workers might not be 
possible.  It may be necessary to lower the
+ingest rate.  If the Tablet Servers are keeping up, deploy additional workers 
by stopping the application, updating
+the fluo.properties to use more workers, and re-initializing and starting the 
application.
+
+Other ways to assess the performance of the application is to monitor the scan 
rate through the Accumulo UI.  If the scan
+rate is staying approximately constant over time (or increasing very slowly), 
then the application is performing as expected.
+In general, the scan rate increases because
+ 
+1. The number of intermediate results increases over time
+2. The number of "deleted" notifications that need to flushed increases over 
time
+3. The number of "deleted" triples that need to be flushed increases over time
+
+The first item is unavoidable if there is no age off policy for the PCJ 
Updater application (which is currently the case).  
+However, 2 and 3 are avoidable by applying the range compaction optimizations 
discussed above.  So it's important to monitor the
+scan rate (and the number of old triples and notifications as discussed above) 
to assess the health of the application.  
+
+Finally an additional item to monitor is which iterators are running in the 
Fluo table.  This can be done by regularly running the listscans
+command in the Accumulo shell.  This gives a sense of how the Tablet Servers 
are being used.  It helps determine whether they are
+spending most of their time finding new notifications (internal Fluo work), or 
issuing scans that are specific to Rya PCJ Updater Observers.
+ 
 
 [Apache Fluo]: https://fluo.apache.org/
 [Apache Fluo 1.0.0-incubating Documentation]: 
https://fluo.apache.org/docs/fluo/1.0.0-incubating/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
index 0d97b2f..e0123be 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -21,19 +21,22 @@ package org.apache.rya.indexing.pcj.fluo.api;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdManager;
 import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.openrdf.query.BindingSet;
@@ -79,7 +82,8 @@ public class DeleteFluoPcj {
      *            Index. (not null)
      * @param pcjId - The PCJ ID for the query that will removed from the Fluo
      *            application. (not null)
-     * @throws UnsupportedQueryException 
+     * @throws UnsupportedQueryException - thrown when Fluo app is unable to 
read FluoQuery associated
+     * with given pcjId.
      */
     public void deletePcj(final FluoClient client, final String pcjId) throws 
UnsupportedQueryException {
         requireNonNull(client);
@@ -104,7 +108,8 @@ public class DeleteFluoPcj {
      * @param tx - Transaction of a given Fluo table. (not null)
      * @param pcjId - Id of query. (not null)
      * @return list of Node IDs associated with the query {@code pcjId}.
-     * @throws UnsupportedQueryException 
+     * @throws UnsupportedQueryException - thrown when Fluo app is unable to 
read FluoQuery associated
+     * with given pcjId.
      */
     private List<String> getNodeIds(Transaction tx, String pcjId) throws 
UnsupportedQueryException {
         requireNonNull(tx);
@@ -131,10 +136,18 @@ public class DeleteFluoPcj {
         requireNonNull(pcjId);
 
         try (final Transaction typeTx = tx) {
+            Set<String> spNodeIds = new HashSet<>();
+            //remove metadata associated with each nodeId and store statement 
pattern nodeIds
             for (final String nodeId : nodeIds) {
                 final NodeType type = NodeType.fromNodeId(nodeId).get();
+                if(type == NodeType.STATEMENT_PATTERN) {
+                    spNodeIds.add(nodeId);
+                }
                 deleteMetadataColumns(typeTx, nodeId, 
type.getMetaDataColumns());
             }
+            //Use stored statement pattern nodeIds to update list of stored 
statement pattern nodeIds
+            //in Fluo table
+            StatementPatternIdManager.removeStatementPatternIds(typeTx, 
spNodeIds);
             typeTx.commit();
         }
     }
@@ -169,37 +182,11 @@ public class DeleteFluoPcj {
 
         final NodeType type = NodeType.fromNodeId(nodeId).get();
         Transaction tx = client.newTransaction();
-        while (deleteDataBatch(tx, getIterator(tx, nodeId, 
type.getResultColumn()), type.getResultColumn())) {
-            tx = client.newTransaction();
-        }
-    }
-
-    private CellScanner getIterator(final Transaction tx, final String nodeId, 
final Column column) {
-        requireNonNull(tx);
-        requireNonNull(nodeId);
-        requireNonNull(column);
-
-        return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
-    }
-
-    private boolean deleteDataBatch(final Transaction tx, final CellScanner 
scanner, final Column column) {
-        requireNonNull(tx);
-        requireNonNull(scanner);
-        requireNonNull(column);
-
-        try (Transaction ntx = tx) {
-            int count = 0;
-            final Iterator<RowColumnValue> iter = scanner.iterator();
-            while (iter.hasNext() && count < batchSize) {
-                final Bytes row = iter.next().getRow();
-                count++;
-                tx.delete(row, column);
-            }
-
-            final boolean hasNext = iter.hasNext();
-            tx.commit();
-            return hasNext;
-        }
+        Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix());
+        SpanBatchDeleteInformation batch = 
SpanBatchDeleteInformation.builder().setColumn(type.getResultColumn())
+                
.setSpan(Span.prefix(prefixBytes)).setBatchSize(batchSize).setNodeId(Optional.of(nodeId)).build();
+        BatchInformationDAO.addBatch(tx, nodeId, batch);
+        tx.commit();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
index ddbaaaf..4de4069 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
@@ -30,8 +30,11 @@ import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -63,7 +66,7 @@ public class GetQueryReport {
      * @param fluo - The connection to Fluo that will be used to fetch the 
metadata. (not null)
      * @return A map from Query ID to QueryReport that holds a report for all 
of
      *   the queries that are being managed within the fluo app.
-     * @throws UnsupportedQueryException 
+     * @throws UnsupportedQueryException
      */
     public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) 
throws UnsupportedQueryException {
         checkNotNull(fluo);
@@ -86,7 +89,7 @@ public class GetQueryReport {
      * @param fluo - The connection to Fluo that will be used to fetch the 
metadata. (not null)
      * @param queryId - The ID of the query to fetch. (not null)
      * @return A report that was built for the query.
-     * @throws UnsupportedQueryException 
+     * @throws UnsupportedQueryException
      */
     public QueryReport getReport(final FluoClient fluo, final String queryId) 
throws UnsupportedQueryException {
         checkNotNull(fluo);
@@ -132,14 +135,20 @@ public class GetQueryReport {
         checkNotNull(nodeId);
         checkNotNull(bindingSetColumn);
 
+        NodeType type = NodeType.fromNodeId(nodeId).get();
+        Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix());
+
         // Limit the scan to the binding set column and node id.
-        final RowScanner rows = 
sx.scanner().over(Span.prefix(nodeId)).fetch(bindingSetColumn).byRow().build();
+        final RowScanner rows = 
sx.scanner().over(Span.prefix(prefixBytes)).fetch(bindingSetColumn).byRow().build();
 
         BigInteger count = BigInteger.valueOf(0L);
         for (ColumnScanner columns : rows) {
-                count = count.add( BigInteger.ONE );
-               }
-        
+            String row = BindingSetRow.makeFromShardedRow(prefixBytes, 
columns.getRow()).getNodeId();
+            if (row.equals(nodeId)) {
+                count = count.add(BigInteger.ONE);
+            }
+        }
+
         return count;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
index 1e86836..b784fbf 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
@@ -19,24 +19,24 @@
 package org.apache.rya.indexing.pcj.fluo.api;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-
-import com.google.common.base.Optional;
-
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
 import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.resolver.triple.TripleRow;
 import org.apache.rya.api.resolver.triple.TripleRowResolverException;
 import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
 
 /**
  * Insert a batch of Triples into. This will trigger observers that will update
@@ -79,7 +79,7 @@ public class InsertTriples {
         try(Transaction tx = fluo.newTransaction()) {
             for(final RyaStatement triple : triples) {
                 try {
-                    tx.set(Bytes.of(spoFormat(triple)), 
FluoQueryColumns.TRIPLES, Bytes.of(visibility.or("")));
+                    tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, 
Bytes.of(visibility.or("")));
                 } catch (final TripleRowResolverException e) {
                     log.error("Could not convert a Triple into the SPO format: 
" + triple);
                 }
@@ -88,7 +88,7 @@ public class InsertTriples {
             tx.commit();
         }
     }
-    
+
     /**
      * Inserts a triple into Fluo.
      *
@@ -101,9 +101,9 @@ public class InsertTriples {
 
         insert(fluo, Collections.singleton(triple));
     }
-    
+
     /**
-     * Insert a batch of RyaStatements into Fluo.  
+     * Insert a batch of RyaStatements into Fluo.
      *
      * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
      * @param triples - The triples to insert. (not null)
@@ -116,7 +116,7 @@ public class InsertTriples {
             for(final RyaStatement triple : triples) {
                 Optional<byte[]> visibility = 
Optional.fromNullable(triple.getColumnVisibility());
                 try {
-                    tx.set(Bytes.of(spoFormat(triple)), 
FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+                    tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, 
Bytes.of(visibility.or(new byte[0])));
                 } catch (final TripleRowResolverException e) {
                     log.error("Could not convert a Triple into the SPO format: 
" + triple);
                 }
@@ -125,8 +125,6 @@ public class InsertTriples {
             tx.commit();
         }
     }
-    
-    
 
     /**
      * Converts a triple into a byte[] holding the Rya SPO representation of 
it.
@@ -135,10 +133,10 @@ public class InsertTriples {
      * @return The Rya SPO representation of the triple.
      * @throws TripleRowResolverException The triple could not be converted.
      */
-    public static byte[] spoFormat(final RyaStatement triple) throws 
TripleRowResolverException {
+    public static Bytes spoFormat(final RyaStatement triple) throws 
TripleRowResolverException {
         checkNotNull(triple);
         final Map<TABLE_LAYOUT, TripleRow> serialized = 
TRIPLE_RESOLVER.serialize(triple);
         final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
-        return spoRow.getRow();
+        return addTriplePrefixAndConvertToBytes(spoRow.getRow());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index fd624eb..4839c04 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,16 +1,22 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
-    license agreements. See the NOTICE file distributed with this work for 
additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
     <parent>
         <groupId>org.apache.rya</groupId>
@@ -60,6 +66,14 @@
             <artifactId>fluo-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-accumulo</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.openrdf.sesame</groupId>
             <artifactId>sesame-queryrender</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
new file mode 100644
index 0000000..62fc48c
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * This class provides the common functionality for generating sharded row
+ * keys for any class that extends it.
+ *
+ */
+public abstract class AbstractNodeUpdater {
+
+    /**
+     * Generates a sharded row key from the provided arguments for inserting 
new results
+     * into the Fluo table.  The row key is of the form node prefix + shardId 
+ nodeId + BindingSet values.
+     * The shardId is formed from a hash of the first Binding value as 
indicated by the VariableOrder.
+     * @param nodeId - query nodeId corresponding to new result
+     * @param varOrder - varOrder used to order Binding values for new row key
+     * @param bs - BindingSet that values will be taken from to form key
+     * @return - sharded row key formed from the provided arguments
+     */
+    public Bytes makeRowKey(String nodeId, VariableOrder varOrder, 
VisibilityBindingSet bs) {
+        return BindingHashShardingFunction.addShard(nodeId, varOrder, bs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
index 2e41cb1..3acf3e9 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
@@ -42,7 +42,6 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.openrdf.model.Literal;
@@ -65,7 +64,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * Updates the results of an Aggregate node when its child has added a new 
Binding Set to its results.
  */
 @DefaultAnnotation(NonNull.class)
-public class AggregationResultUpdater {
+public class AggregationResultUpdater extends AbstractNodeUpdater {
     private static final Logger log = 
Logger.getLogger(AggregationResultUpdater.class);
 
     private static final AggregationStateSerDe AGG_STATE_SERDE = new 
ObjectSerializationAggregationStateSerDe();
@@ -104,7 +103,7 @@ public class AggregationResultUpdater {
         // The Row ID for the Aggregation State that needs to be updated is 
defined by the Group By variables.
         final String aggregationNodeId = aggregationMetadata.getNodeId();
         final VariableOrder groupByVars = 
aggregationMetadata.getGroupByVariableOrder();
-        final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, 
groupByVars, childBindingSet);
+        final Bytes rowId = makeRowKey(aggregationNodeId, groupByVars, 
childBindingSet);
 
         // Load the old state from the bytes if one was found; otherwise 
initialize the state.
         final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, 
FluoQueryColumns.AGGREGATION_BINDING_SET) );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
index 2e45ea6..9b7558f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -21,15 +21,17 @@ package org.apache.rya.indexing.pcj.fluo.app;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
+import java.util.Objects;
+
 import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import net.jcip.annotations.Immutable;
 
 /**
- * The values of an Accumulo Row ID for a row that stores a Binding set for
- * a specific Node ID of a query.
+ * The values of an Accumulo Row ID for a row that stores a Binding set for a 
specific Node ID of a query.
  */
 @Immutable
 @DefaultAnnotation(NonNull.class)
@@ -77,4 +79,39 @@ public class BindingSetRow {
         final String bindingSetString = rowArray.length == 2 ? rowArray[1] : 
"";
         return new BindingSetRow(nodeId, bindingSetString);
     }
+
+    /**
+     * Creates a BindingSetRow from a sharded row entry, where the row is 
sharded according to
+     * {@link BindingHashShardingFunction}.
+     *
+     * @param prefixBytes - prefix of the node type that the row corresponds 
to (see prefixes in
+     *            {@link IncrementalUpdateConstants}).
+     * @param row - row that BindingSetRow is created from
+     * @return - BindingSetRow object
+     */
+    public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes 
row) {
+        return make(BindingHashShardingFunction.removeHash(prefixBytes, row));
+    }
+
+    @Override
+    public String toString() {
+        return "NodeId: " + nodeId + "\n" + "BindingSet String: " + 
bindingSetString;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if(this == other) { return true;}
+
+        if (other instanceof BindingSetRow) {
+            BindingSetRow row = (BindingSetRow) other;
+            return Objects.equals(nodeId, row.nodeId) && 
Objects.equals(bindingSetString, row.bindingSetString);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, bindingSetString);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
index 6642780..b50e862 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -28,22 +28,21 @@ import org.apache.rya.api.domain.RyaSubGraph;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
 /**
  * This class creates results for the ConstructQuery.  This class applies the 
{@link ConstructGraph}
  * associated with the Construct Query to generate a collection of {@link 
RyaStatement}s.  These statements
- * are then used to form a {@link RyaSubGraph} that is serialized and stored 
as a value in the Column 
+ * are then used to form a {@link RyaSubGraph} that is serialized and stored 
as a value in the Column
  * {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
  *
  */
-public class ConstructQueryResultUpdater {
+public class ConstructQueryResultUpdater extends AbstractNodeUpdater {
 
     private static final Logger log = 
Logger.getLogger(ConstructQueryResultUpdater.class);
     private static final RyaSubGraphKafkaSerDe serializer = new 
RyaSubGraphKafkaSerDe();
-    
+
     /**
      * Updates the Construct Query results by applying the {@link 
ConnstructGraph} to
      * create a {@link RyaSubGraph} and then writing the subgraph to {@link 
FluoQueryColumns#CONSTRUCT_STATEMENTS}.
@@ -52,15 +51,15 @@ public class ConstructQueryResultUpdater {
      * @param metadata - metadata that the ConstructProjection is extracted 
from
      */
     public void updateConstructQueryResults(TransactionBase tx, 
VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
-        
+
         String nodeId = metadata.getNodeId();
         VariableOrder varOrder = metadata.getVariableOrder();
         Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
         ConstructGraph graph = metadata.getConstructGraph();
         String parentId = metadata.getParentNodeId();
-        
+
         // Create the Row Key for the emitted binding set. It does not contain 
visibilities.
-        final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs);
+        final Bytes resultRow = makeRowKey(nodeId, varOrder, bs);
 
         // If this is a new binding set, then emit it.
         if(tx.get(resultRow, column) == null || 
varOrder.getVariableOrders().size() < bs.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 2cc9f77..c829156 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -57,7 +56,7 @@ import info.aduna.iteration.CloseableIteration;
  * Set to its results.
  */
 @DefaultAnnotation(NonNull.class)
-public class FilterResultUpdater {
+public class FilterResultUpdater extends AbstractNodeUpdater {
 
     private static final Logger log = 
LoggerFactory.getLogger(FilterResultUpdater.class);
 
@@ -114,7 +113,7 @@ public class FilterResultUpdater {
 
             // Create the Row Key for the emitted binding set. It does not 
contain visibilities.
             final VariableOrder filterVarOrder = 
filterMetadata.getVariableOrder();
-            final Bytes resultRow = 
RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, 
childBindingSet);
+            final Bytes resultRow = makeRowKey(filterMetadata.getNodeId(), 
filterVarOrder, childBindingSet);
 
             // Serialize and emit BindingSet
             final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
index 602fd9d..e0483ab 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
+import static 
org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray;
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
@@ -41,8 +42,15 @@ public class IncUpdateDAO {
 
     private static final WholeRowTripleResolver tr = new 
WholeRowTripleResolver();
 
+    /**
+     * Deserializes a triple stored in the Fluo table.
+     * @param row - serialized triple
+     * @return - triple deserialized as a RyaStatement
+     */
     public static RyaStatement deserializeTriple(final Bytes row) {
-        final byte[] rowArray = row.toArray();
+
+        checkNotNull(row);
+        final byte[] rowArray = removeTriplePrefixAndConvertToByteArray(row);
 
         RyaStatement rs = null;
         try {
@@ -55,6 +63,7 @@ public class IncUpdateDAO {
     }
 
     public static String getTripleString(final RyaStatement rs) {
+        checkNotNull(rs);
         final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE;
         final String pred = rs.getPredicate().getData() + TYPE_DELIM + 
URI_TYPE;
         final String objData = rs.getObject().getData();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 5405837..63b9715 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -30,15 +30,18 @@ public class IncrementalUpdateConstants {
     public static final String TYPE_DELIM = "<<~>>";
 
     //to be used in construction of id for each node
-    public static final String SP_PREFIX = "STATEMENT_PATTERN";
-    public static final String JOIN_PREFIX = "JOIN";
-    public static final String FILTER_PREFIX = "FILTER";
-    public static final String AGGREGATION_PREFIX = "AGGREGATION";
-    public static final String QUERY_PREFIX = "QUERY";
-    public static final String PROJECTION_PREFIX = "PROJECTION";
-    public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
-    public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
-    
+    public static final String TRIPLE_PREFIX = "T";
+    public static final String SP_PREFIX = "SP";
+    public static final String JOIN_PREFIX = "J";
+    public static final String FILTER_PREFIX = "F";
+    public static final String AGGREGATION_PREFIX = "A";
+    public static final String QUERY_PREFIX = "Q";
+    public static final String PROJECTION_PREFIX = "PR";
+    public static final String CONSTRUCT_PREFIX = "C";
+    public static final String PERIODIC_QUERY_PREFIX = "PE";
+
+    public static final String STATEMENT_PATTERN_ID = "SP_ID";
+
     //binding name reserved for periodic bin id for periodic query results
     public static final String PERIODIC_BIN_ID = 
PeriodicQueryResultStorage.PeriodicBinId;
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index fb3ee0c..bcce77d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -19,8 +19,6 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -43,13 +41,12 @@ import 
org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
 import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
@@ -65,14 +62,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * new Binding Set to its results.
  */
 @DefaultAnnotation(NonNull.class)
-public class JoinResultUpdater {
+public class JoinResultUpdater extends AbstractNodeUpdater {
 
     private static final Logger log = 
Logger.getLogger(JoinResultUpdater.class);
-
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
-    private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER 
= new VisibilityBindingSetStringConverter();
-
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+    private final FluoQueryMetadataCache queryDao = 
MetadataCacheSupplier.getOrCreateCache();
 
     /**
      * Updates the results of a Join node when one of its children has added a
@@ -130,7 +124,7 @@ public class JoinResultUpdater {
         Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, 
siblingId);
         Column siblingColumn = getScanColumnFamily(siblingId);
         Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, 
siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize());
-        
+
         // Iterates over the resulting BindingSets from the join.
         final Iterator<VisibilityBindingSet> newJoinResults;
         if(emittingSide == Side.LEFT) {
@@ -145,8 +139,8 @@ public class JoinResultUpdater {
             final VisibilityBindingSet newJoinResult = newJoinResults.next();
 
             // Create the Row Key for the emitted binding set. It does not 
contain visibilities.
-            final Bytes resultRow = 
RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
-            
+            final Bytes resultRow = makeRowKey(joinMetadata.getNodeId(), 
joinVarOrder, newJoinResult);
+
             // Only insert the join Binding Set if it is new or BindingSet 
contains values not used in resultRow.
             if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || 
joinVarOrder.getVariableOrders().size() < newJoinResult.size()) {
                 // Create the Node Value. It does contain visibilities.
@@ -159,7 +153,7 @@ public class JoinResultUpdater {
                 tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, 
nodeValueBytes);
             }
         }
-        
+
         // if batch limit met, there are additional entries to process
         // update the span and register updated batch job
         if (rowColumn.isPresent()) {
@@ -183,18 +177,18 @@ public class JoinResultUpdater {
     public static enum Side {
         LEFT, RIGHT;
     }
-    
-    
+
+
     /**
      * Fetches batch to be processed by scanning over the Span specified by the
      * {@link JoinBatchInformation}. The number of results is less than or 
equal
      * to the batch size specified by the JoinBatchInformation.
-     * 
+     *
      * @param tx - Fluo transaction in which batch operation is performed
      * @param siblingSpan - span of sibling to retrieve elements to join with
      * @param bsSet- set that batch results are added to
      * @return Set - containing results of sibling scan.
-     * @throws Exception 
+     * @throws Exception
      */
     private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span 
siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int 
batchSize) throws Exception {
 
@@ -222,7 +216,7 @@ public class JoinResultUpdater {
             return Optional.absent();
         }
     }
-    
+
     /**
      * Creates a Span for the sibling node to retrieve BindingSets to join with
      * @param tx
@@ -231,29 +225,19 @@ public class JoinResultUpdater {
      * @param siblingId - Id of the sibling node whose BindingSets will be 
retrieved and joined with the update
      * @return Span to retrieve sibling node's BindingSets to form join results
      */
-    private Span getSpan(TransactionBase tx, final String childId, final 
BindingSet childBindingSet, final String siblingId) {
+    private Span getSpan(TransactionBase tx, final String childId, final 
VisibilityBindingSet childBindingSet, final String siblingId) {
         // Get the common variable orders. These are used to build the prefix.
         final VariableOrder childVarOrder = getVarOrder(tx, childId);
         final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
         final List<String> commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
 
-        // Get the Binding strings
-        final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
-        final String[] childBindingArray = 
childBindingSetString.split("\u0001");
-        final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
-
-        // Create the prefix that will be used to scan for binding sets of the 
sibling node.
-        // This prefix includes the sibling Node ID and the common variable 
values from
-        // childBindingSet.
-        String siblingScanPrefix = "";
-        for(int i = 0; i < commonVars.size(); i++) {
-            if(siblingScanPrefix.length() == 0) {
-                siblingScanPrefix = childBindingStrings[i];
-            } else {
-                siblingScanPrefix += DELIM + childBindingStrings[i];
-            }
+        Bytes siblingScanPrefix = null;
+        if(!commonVars.isEmpty()) {
+            siblingScanPrefix = makeRowKey(siblingId, new 
VariableOrder(commonVars), childBindingSet);
+        } else {
+            siblingScanPrefix = makeRowKey(siblingId, siblingVarOrder, 
childBindingSet);
         }
-        siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
+
         return Span.prefix(siblingScanPrefix);
     }
 
@@ -277,13 +261,13 @@ public class JoinResultUpdater {
                 return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, 
nodeId).getVariableOrder());
             case JOIN:
                 return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, 
nodeId).getVariableOrder());
-            case PROJECTION: 
+            case PROJECTION:
                 return 
removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, 
nodeId).getVariableOrder());
             default:
                 throw new IllegalArgumentException("Could not figure out the 
variable order for node with ID: " + nodeId);
         }
     }
-    
+
     private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) {
         List<String> varOrderList = varOrder.getVariableOrders();
         
if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
@@ -311,7 +295,7 @@ public class JoinResultUpdater {
 
         final List<String> commonVars = new ArrayList<>();
 
-        // Only need to iteratre through the shorted order's length.
+        // Only need to iterate through the shorted order's length.
         final Iterator<String> vars1It = vars1.iterator();
         final Iterator<String> vars2It = vars2.iterator();
         while(vars1It.hasNext() && vars2It.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
index cb331cf..9b9b3ae 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
@@ -27,7 +27,6 @@ import org.apache.fluo.api.data.Column;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.model.Literal;
@@ -39,10 +38,10 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 /**
  * This class adds the appropriate BinId Binding to each BindingSet that it 
processes.  The BinIds
  * are used to determine which period a BindingSet (with a temporal Binding) 
falls into so that
- * a user can receive periodic updates for a registered query. 
+ * a user can receive periodic updates for a registered query.
  *
  */
-public class PeriodicQueryUpdater {
+public class PeriodicQueryUpdater extends AbstractNodeUpdater {
 
     private static final Logger log = 
Logger.getLogger(PeriodicQueryUpdater.class);
     private static final ValueFactory vf = new ValueFactoryImpl();
@@ -65,9 +64,9 @@ public class PeriodicQueryUpdater {
             binnedBs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(id));
             VisibilityBindingSet visibilityBindingSet = new 
VisibilityBindingSet(binnedBs, bs.getVisibility());
             Bytes periodicBsBytes = BS_SERDE.serialize(visibilityBindingSet);
-            
-            //create row 
-            final Bytes resultRow = 
RowKeyUtil.makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), 
binnedBs);
+
+            //create row
+            final Bytes resultRow = makeRowKey(metadata.getNodeId(), 
metadata.getVariableOrder(), visibilityBindingSet);
             Column col = FluoQueryColumns.PERIODIC_QUERY_BINDING_SET;
             tx.set(resultRow, col, periodicBsBytes);
         }
@@ -75,8 +74,8 @@ public class PeriodicQueryUpdater {
 
     /**
      * This method returns the end times of all period windows containing the 
time contained in
-     * the BindingSet.  
-     * 
+     * the BindingSet.
+     *
      * @param metadata
      * @return Set of period bin end times
      */
@@ -97,7 +96,7 @@ public class PeriodicQueryUpdater {
     private long getRightBinEndPoint(long eventDateTime, long periodDuration) {
         return (eventDateTime / periodDuration + 1) * periodDuration;
     }
-    
+
     private long getLeftBinEndPoint(long eventTime, long periodDuration) {
         return (eventTime / periodDuration) * periodDuration;
     }
@@ -116,7 +115,7 @@ public class PeriodicQueryUpdater {
         long rightEventBin = getRightBinEndPoint(eventDateTime, 
periodDuration);
         //get the bin left of the current moment for comparison
         long currentBin = getLeftBinEndPoint(System.currentTimeMillis(), 
periodDuration);
-        
+
         if(currentBin >= rightEventBin) {
             long numBins = (windowDuration -(currentBin - 
rightEventBin))/periodDuration;
             for(int i = 0; i < numBins; i++) {
@@ -132,6 +131,6 @@ public class PeriodicQueryUpdater {
 
         return binIds;
     }
-    
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
index f9d8257..e928e3d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
@@ -26,7 +26,6 @@ import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -40,7 +39,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * new Binding Set to its results.
  */
 @DefaultAnnotation(NonNull.class)
-public class ProjectionResultUpdater {
+public class ProjectionResultUpdater extends AbstractNodeUpdater {
     private static final Logger log = 
Logger.getLogger(QueryResultUpdater.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
@@ -65,7 +64,7 @@ public class ProjectionResultUpdater {
         log.trace(
                 "Transaction ID: " + tx.getStartTimestamp() + "\n" +
                 "Node ID: " + projectionMetadata.getNodeId() + "\n" +
-                "Parent Node ID: " + projectionMetadata.getParentNodeId() + 
"\n" +        
+                "Parent Node ID: " + projectionMetadata.getParentNodeId() + 
"\n" +
                 "Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" 
+
                 "Child Binding Set:\n" + childBindingSet + "\n");
 
@@ -73,12 +72,13 @@ public class ProjectionResultUpdater {
         final VariableOrder queryVarOrder = 
projectionMetadata.getVariableOrder();
         final VariableOrder projectionVarOrder = 
projectionMetadata.getProjectedVars();
         final BindingSet queryBindingSet = 
BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet);
+        VisibilityBindingSet projectedBs = new 
VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility());
 
         // Create the Row Key for the result. If the child node groups 
results, then the key must only contain the Group By variables.
-        Bytes resultRow  = 
RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, 
queryBindingSet);
+        Bytes resultRow  = makeRowKey(projectionMetadata.getNodeId(), 
queryVarOrder, projectedBs);
 
         // Create the Binding Set that goes in the Node Value. It does contain 
visibilities.
-        final Bytes nodeValueBytes = BS_SERDE.serialize(new 
VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()));
+        final Bytes nodeValueBytes = BS_SERDE.serialize(projectedBs);
 
         log.trace(
                 "Transaction ID: " + tx.getStartTimestamp() + "\n" +

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 37d7256..6c383e2 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -38,7 +37,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * new Binding Set to its results.
  */
 @DefaultAnnotation(NonNull.class)
-public class QueryResultUpdater {
+public class QueryResultUpdater extends AbstractNodeUpdater {
     private static final Logger log = 
Logger.getLogger(QueryResultUpdater.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
@@ -70,7 +69,7 @@ public class QueryResultUpdater {
         final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
 
         // Create the Row Key for the result. If the child node groups 
results, then the key must only contain the Group By variables.
-        final Bytes resultRow = 
RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, 
childBindingSet);
+        final Bytes resultRow = makeRowKey(queryMetadata.getNodeId(), 
queryVarOrder, childBindingSet);
 
         // Create the Binding Set that goes in the Node Value. It does contain 
visibilities.
         final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
index 9584a10..c077310 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
@@ -23,6 +23,8 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 
 /**
  * This class provides common functionality for implementations of {@link 
BatchBindingSetUpdater}.
@@ -30,6 +32,8 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
  */
 public abstract class AbstractBatchBindingSetUpdater implements 
BatchBindingSetUpdater {
 
+    protected static final FluoQueryMetadataCache CACHE = 
MetadataCacheSupplier.getOrCreateCache();
+
     /**
      * Updates the Span to create a new {@link BatchInformation} object to be 
fed to the
      * {@link BatchObserver}.  This message is called in the event that the 
BatchBindingSetUpdater
@@ -41,7 +45,7 @@ public abstract class AbstractBatchBindingSetUpdater 
implements BatchBindingSetU
     public static Span getNewSpan(RowColumn newStart, Span oldSpan) {
         return new Span(newStart, oldSpan.isStartInclusive(), 
oldSpan.getEnd(), oldSpan.isEndInclusive());
     }
-    
+
     /**
      * Cleans up old batch job.  This method is meant to be called by any 
overriding method
      * to clean up old batch tasks.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
index 2da3e39..e15fa8f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -36,7 +36,7 @@ public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation
      * Create AbstractBatchInformation
      * @param batchSize - size of batch to be processed
      * @param task - type of task processed (Add, Delete, Udpate)
-     * @param column - Cpolumn that Span notification is applied
+     * @param column - Column that Span notification is applied
      * @param span - span used to indicate where processing should begin
      */
     public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
@@ -62,7 +62,7 @@ public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation
     public void setSpan(Span span) {
         this.span = span;
     }
-    
+
     @Override
     public String toString() {
         return new StringBuilder()
@@ -74,7 +74,7 @@ public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation
                 .append("}")
                 .toString();
     }
-    
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {
@@ -94,6 +94,6 @@ public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation
     public int hashCode() {
         return Objects.hash(super.getBatchSize(), span, super.getColumn(), 
super.getTask());
     }
-    
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
index a266341..a883be1 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
@@ -37,9 +37,8 @@ import 
org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -53,7 +52,6 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
 
     private static final Logger log = 
Logger.getLogger(JoinBatchBindingSetUpdater.class);
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
-    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
     /**
      * Processes {@link JoinBatchInformation}. Updates the BindingSets
@@ -65,7 +63,7 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
      * entries that need to be updated exceeds the batch size, the row of the
      * first unprocessed BindingSets is used to create a new JoinBatch job to
      * process the remaining BindingSets.
-     * @throws Exception 
+     * @throws Exception
      */
     @Override
     public void processBatch(TransactionBase tx, Bytes row, BatchInformation 
batch) throws Exception {
@@ -100,15 +98,15 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
             newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), 
bs);
         }
 
-        // Insert the new join binding sets to the Fluo table.
-        final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId);
+        // Read join metadata, create new join BindingSets and insert them 
into the Fluo table.
+        final JoinMetadata joinMetadata = CACHE.readJoinMetadata(tx, nodeId);
         final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
         while (newJoinResults.hasNext()) {
             final VisibilityBindingSet newJoinResult = newJoinResults.next();
             //create BindingSet value
             Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
             //make rowId
-            Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, 
newJoinResult);
+            Bytes rowKey = BindingHashShardingFunction.addShard(nodeId, 
joinVarOrder, newJoinResult);
             final Column col = FluoQueryColumns.JOIN_BINDING_SET;
             processTask(tx, task, rowKey, col, bsBytes);
         }
@@ -144,12 +142,12 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
      * Fetches batch to be processed by scanning over the Span specified by the
      * {@link JoinBatchInformation}. The number of results is less than or 
equal
      * to the batch size specified by the JoinBatchInformation.
-     * 
+     *
      * @param tx - Fluo transaction in which batch operation is performed
      * @param batch - batch order to be processed
      * @param bsSet- set that batch results are added to
      * @return Set - containing results of sibling scan.
-     * @throws Exception 
+     * @throws Exception
      */
     private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, 
JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
 

Reply via email to