http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
new file mode 100644
index 0000000..1eafc00
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo.integration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.rya.accumulo.AccumuloITBase;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.PeriodicQueryTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
+
+    private PeriodicQueryResultStorage periodicStorage;
+    private static final String RYA = "rya_";
+    private static final PeriodicQueryTableNameFactory nameFactory = new 
PeriodicQueryTableNameFactory();
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    
+    @Before
+    public void init() throws AccumuloException, AccumuloSecurityException {
+        
super.getConnector().securityOperations().changeUserAuthorizations("root", new 
Authorizations("U"));
+        periodicStorage = new 
AccumuloPeriodicQueryResultStorage(super.getConnector(), RYA);
+    }
+    
+    
+    @Test
+    public void testCreateAndMeta() throws PeriodicQueryStorageException {
+        
+        String sparql = "select ?x where { ?x <urn:pred> ?y.}";
+        VariableOrder varOrder = new VariableOrder("periodicBinId", "x");
+        PeriodicQueryStorageMetadata expectedMeta = new 
PeriodicQueryStorageMetadata(sparql, varOrder);
+        
+        String id = periodicStorage.createPeriodicQuery(sparql);
+        Assert.assertEquals(expectedMeta, 
periodicStorage.getPeriodicQueryMetadata(id));
+        Assert.assertEquals(Arrays.asList(nameFactory.makeTableName(RYA, id)), 
periodicStorage.listPeriodicTables());
+        periodicStorage.deletePeriodicQuery(id);
+    }
+    
+    
+    @Test
+    public void testAddListDelete() throws Exception {
+        
+        String sparql = "select ?x where { ?x <urn:pred> ?y.}";
+        String id = periodicStorage.createPeriodicQuery(sparql);
+        
+        Set<BindingSet> expected = new HashSet<>();
+        Set<VisibilityBindingSet> storageSet = new HashSet<>();
+        
+        //add result matching user's visibility
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("periodicBinId", vf.createLiteral(1L));
+        bs.addBinding("x",vf.createURI("uri:uri123"));
+        expected.add(bs);
+        storageSet.add(new VisibilityBindingSet(bs,"U"));
+        
+        //add result with different visibility that is not expected
+        bs = new QueryBindingSet();
+        bs.addBinding("periodicBinId", vf.createLiteral(1L));
+        bs.addBinding("x",vf.createURI("uri:uri456"));
+        storageSet.add(new VisibilityBindingSet(bs,"V"));
+        
+        periodicStorage.addPeriodicQueryResults(id, storageSet);
+        
+        Set<BindingSet> actual = new HashSet<>();
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(id, Optional.of(1L))) {
+            iter.forEachRemaining(x -> actual.add(x));
+        }
+        
+        Assert.assertEquals(expected, actual);
+        
+        periodicStorage.deletePeriodicQueryResults(id, 1L);
+        
+        Set<BindingSet> actual2 = new HashSet<>();
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(id, Optional.of(1L))) {
+            iter.forEachRemaining(x -> actual2.add(x));
+        }
+        
+        Assert.assertEquals(new HashSet<>(), actual2);
+        periodicStorage.deletePeriodicQuery(id);
+        
+    }
+    
+    @Test
+    public void multiBinTest() throws PeriodicQueryStorageException, Exception 
{
+        
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
//n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "select ?id (count(?obs) as ?total) where {" //n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasId> ?id } group by ?id"; //n
+        
+        
+        final ValueFactory vf = new ValueFactoryImpl();
+        long currentTime = System.currentTimeMillis();
+        String queryId = UUID.randomUUID().toString().replace("-", "");
+        
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expected1 = new HashSet<>();
+        final Set<BindingSet> expected2 = new HashSet<>();
+        final Set<BindingSet> expected3 = new HashSet<>();
+        final Set<BindingSet> expected4 = new HashSet<>();
+        final Set<VisibilityBindingSet> storageResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime/period)*period;
+        
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expected1.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expected1.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expected1.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expected1.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expected2.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expected2.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expected2.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expected3.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expected3.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        expected4.add(bs);
+        storageResults.add(new VisibilityBindingSet(bs));
+        
+        
+        String id = periodicStorage.createPeriodicQuery(queryId, sparql);
+        periodicStorage.addPeriodicQueryResults(queryId, storageResults);
+        
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId))) {
+            Set<BindingSet> actual1 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual1.add(iter.next());
+            }
+            Assert.assertEquals(expected1, actual1);
+        }
+        
+        periodicStorage.deletePeriodicQueryResults(queryId, binId);
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId))) {
+            Set<BindingSet> actual1 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual1.add(iter.next());
+            }
+            Assert.assertEquals(Collections.emptySet(), actual1);
+        }
+        
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId + period))) {
+            Set<BindingSet> actual2 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual2.add(iter.next());
+            }
+            Assert.assertEquals(expected2, actual2);
+        }
+        
+        periodicStorage.deletePeriodicQueryResults(queryId, binId + period);
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId + period))) {
+            Set<BindingSet> actual2 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual2.add(iter.next());
+            }
+            Assert.assertEquals(Collections.emptySet(), actual2);
+        }
+        
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId + 2*period))) {
+            Set<BindingSet> actual3 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual3.add(iter.next());
+            }
+            Assert.assertEquals(expected3, actual3);
+        }
+        
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(binId + 3*period))) {
+            Set<BindingSet> actual4 = new HashSet<>();
+            while(iter.hasNext()) {
+                actual4.add(iter.next());
+            }
+            Assert.assertEquals(expected4, actual4);
+        }
+        periodicStorage.deletePeriodicQuery(id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/README.md
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/README.md b/extras/rya.pcj.fluo/README.md
index 70361c1..1207705 100644
--- a/extras/rya.pcj.fluo/README.md
+++ b/extras/rya.pcj.fluo/README.md
@@ -19,7 +19,15 @@ Rya Incrementally Updating Precomputed Joins
 ============================================
 This project is an implementation of the Rya Precomputed Join (PCJ) indexing 
 feature that runs on top of [Fluo][1] so that it may incrementally update the
-results of a query as new semantic triples are added to storage.  
+results of a query as new semantic triples are added to storage.  At a high 
level, the Rya Fluo application 
+works by registering the individual RDF4J QueryNodes with the Fluo table in 
the form of metadata.  For example, 
+if a join occurs in a given query, then that join is given a unique id when 
the query is registered with the Rya 
+Fluo application, along with metadata indicating its parent node, its left and 
right child nodes, along with 
+other information necessary for the application to process the join.  In this 
way, the entire RDF4J query tree is recreated
+within Fluo.  For each node type supported by the Rya Fluo application, there 
is also an associated Fluo Observer 
+that processes BindingSet notifications for that node (this occurs when a new 
result percolates up the query tree and 
+arrives at that node in the form of a BindingSet).  These Observers 
incrementally evaluate the queries registered with the
+Fluo application by performing the processing required for their associated 
node as soon as a result for that node is available.  
 
 This project contains the following modules:
   * **rya.pcj.fluo.app** - A Fluo application that incrementally updates the 
results
@@ -38,5 +46,20 @@ This project contains the following modules:
   * **integration** - Contains integration tests that use a MiniAccumuloCluster
     and MiniFluo to ensure the Rya PCJ Fluo App work within an emulation of the
     production environment.
+    
+    
+Currently the Rya Fluo Application supports RDF4J queries that contain Joins, 
Filters, Projections, StatementPatterns, and Aggregations.
+To support the evaluation of additional RDF4J query nodes in the Fluo 
application, here are the steps that need to be followed:
+
+  1. Create the appropriate Metadata Object by extending CommonNodeMetadata 
(e.g. StatementPatternMetadata, JoinMetadata, etc.)
+  2. Add metadata Columns to FluoQueryColumns
+  3. Create NodeType from the metadata Columns
+  4. Add the node prefix to IncrementalUpdateConstants
+  5. Integrate metadata with FluoQueryMetadataDAO
+  6. Create Updater and integrate with BindingSetUpdater
+  7. Create Observer (e.g. StatementPatternObserver, JoinObserver, etc.)
+  8. Integrate with SparqlFluoQueryBuilder
+  
+All of the classes mentioned above can be found in the rya.pcj.fluo.app 
project.
 
 [1]: http://fluo.io/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index a17f02f..767d9d2 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -44,9 +45,11 @@ import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.query.BatchRyaQuery;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -62,6 +65,8 @@ import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 
+import com.google.common.base.Preconditions;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -152,13 +157,51 @@ public class CreatePcj {
 
     
 
+
     /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This 
method requires that a
-     * PCJ table already exist for the query corresponding to the pcjId.  
Results will be exported
-     * to this table.
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ. This 
method
+     * creates the FluoQuery (metadata) inside of Fluo so that results can be 
incrementally generated
+     * inside of Fluo.  This method assumes that the user will export the 
results to Kafka or
+     * some other external resource.  The export id is equivalent to the 
queryId that is returned,
+     * which is in contrast to the other createPcj methods in this class which 
accept an external pcjId
+     * that is used to identify the Accumulo table or Kafka topic for 
exporting results.
+     *
+     * @param sparql - sparql query String to be registered with Fluo
+     * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
+     * @return queryId - The id of the root of the query metadata tree in Fluo
+     * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
+     */
+    public String createPcj(String sparql, FluoClient fluo) throws 
MalformedQueryException {
+        Preconditions.checkNotNull(sparql);
+        Preconditions.checkNotNull(fluo);
+        
+        FluoQuery fluoQuery = makeFluoQuery(sparql);
+        String queryId = null;
+        if(fluoQuery.getQueryMetadata().isPresent()) {
+            queryId = fluoQuery.getQueryMetadata().get().getNodeId();
+            queryId = 
queryId.split(IncrementalUpdateConstants.QUERY_PREFIX)[1];
+        } else {
+            queryId = fluoQuery.getConstructQueryMetadata().get().getNodeId();
+            queryId = 
queryId.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1];
+        }
+        
+        String[] idArray = queryId.split("_");
+        String id = idArray[idArray.length - 1];
+        
+        writeFluoQuery(fluo, fluoQuery, id);
+        return id;
+    }
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This 
method provides
+     * no guarantees that a PCJ with the given pcjId exists outside of Fluo. 
This method merely
+     * creates the FluoQuery (metadata) inside of Fluo so that results and be 
incrementally generated
+     * inside of Fluo.  This method assumes that the user will export the 
results to Kafka or
+     * some other external resource.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
-     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param sparql - sparql query String to be registered with Fluo
      * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
      * @return The metadata that was written to the Fluo application for the 
PCJ.
      * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
@@ -166,40 +209,113 @@ public class CreatePcj {
      */
     public FluoQuery createPcj(
             final String pcjId,
-            final PrecomputedJoinStorage pcjStorage,
+            final String sparql,
             final FluoClient fluo) throws MalformedQueryException, 
PcjException {
         requireNonNull(pcjId);
-        requireNonNull(pcjStorage);
+        requireNonNull(sparql);
         requireNonNull(fluo);
 
+        FluoQuery fluoQuery = makeFluoQuery(sparql);
+        writeFluoQuery(fluo, fluoQuery, pcjId);
+
+        return fluoQuery;
+    }
+    
+    private FluoQuery makeFluoQuery(String sparql) throws 
MalformedQueryException {
+        
         // Keeps track of the IDs that are assigned to each of the query's 
nodes in Fluo.
         // We use these IDs later when scanning Rya for historic Statement 
Pattern matches
         // as well as setting up automatic exports.
         final NodeIds nodeIds = new NodeIds();
 
         // Parse the query's structure for the metadata that will be written 
to fluo.
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        final String sparql = pcjMetadata.getSparql();
         final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, 
null);
-        final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
-
+        return new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+    }
+    
+    private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String 
pcjId) {
         try (Transaction tx = fluo.newTransaction()) {
             // Write the query's structure to Fluo.
             new FluoQueryMetadataDAO().write(tx, fluoQuery);
-            
-            if (fluoQuery.getQueryMetadata().isPresent()) {
-                // If the query is not a construct query, 
-                // the results of the query are eventually exported to an 
instance of Rya, so store the Rya ID for the PCJ.
-                final String queryId = 
fluoQuery.getQueryMetadata().get().getNodeId();
+
+            // The results of the query are eventually exported to an instance
+            // of Rya, so store the Rya ID for the PCJ.
+            QueryMetadata metadata = fluoQuery.getQueryMetadata().orNull();
+            if (metadata != null) {
+                String queryId = metadata.getNodeId();
                 tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
                 tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-            } 
+            }
+
             // Flush the changes to Fluo.
             tx.commit();
         }
+    }
 
-        return fluoQuery;
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  The 
method takes in an
+     * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with 
the given pcjId exists.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
+     * @return The metadata that was written to the Fluo application for the 
PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
+     */
+    public FluoQuery createPcj(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo) throws MalformedQueryException, 
PcjException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+
+        // Parse the query's structure for the metadata that will be written 
to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        return createPcj(pcjId, sparql, fluo);
+    }
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * <p>
+     * This call scans Rya for Statement Pattern matches and inserts them into
+     * the Fluo application. This method does not verify that a PcjTable with 
the
+     * the given pcjId actually exists. It is assumed that results for any 
query registered
+     * using this method will be exported to Kafka or some other external 
service.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
+     * @param sparql - sparql query that will registered with Fluo. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
+     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+     * @return The Fluo application's Query ID of the query that was created.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
+     * @throws RyaDAOException Historic PCJ results could not be loaded 
because of a problem with {@code rya}.
+     */
+    public String withRyaIntegration(
+            final String pcjId,
+            final String sparql,
+            final FluoClient fluo,
+            final Connector accumulo,
+            final String ryaInstance ) throws MalformedQueryException, 
PcjException, RyaDAOException {
+        requireNonNull(pcjId);
+        requireNonNull(sparql);
+        requireNonNull(fluo);
+        requireNonNull(accumulo);
+        requireNonNull(ryaInstance);
+
+        
+        // Write the SPARQL query's structure to the Fluo Application.
+        final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
+        //import results already ingested into Rya that match query
+        importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
+        // return queryId to the caller for later monitoring from the export.
+        return fluoQuery.getQueryMetadata().get().getNodeId();
     }
+    
 
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.
@@ -231,31 +347,39 @@ public class CreatePcj {
         requireNonNull(fluo);
         requireNonNull(accumulo);
         requireNonNull(ryaInstance);
-
-        // Write the SPARQL query's structure to the Fluo Application.
-        final FluoQuery fluoQuery = createPcj(pcjId, pcjStorage, fluo);
-
+        
+        // Parse the query's structure for the metadata that will be written 
to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        
+        return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
+    }
+    
+    private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery 
fluoQuery, Connector accumulo, String ryaInstance)
+            throws RyaDAOException {
         // Reuse the same set object while performing batch inserts.
         final Set<RyaStatement> queryBatch = new HashSet<>();
 
-        // Iterate through each of the statement patterns and insert their 
historic matches into Fluo.
+        // Iterate through each of the statement patterns and insert their
+        // historic matches into Fluo.
         for (final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
-            // Get an iterator over all of the binding sets that match the 
statement pattern.
+            // Get an iterator over all of the binding sets that match the
+            // statement pattern.
             final StatementPattern pattern = 
FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
             queryBatch.add(spToRyaStatement(pattern));
         }
 
-        //Create AccumuloRyaQueryEngine to query for historic results
+        // Create AccumuloRyaQueryEngine to query for historic results
         final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
         conf.setTablePrefix(ryaInstance);
         conf.setAuths(getAuths(accumulo));
 
-        try(final AccumuloRyaQueryEngine queryEngine = new 
AccumuloRyaQueryEngine(accumulo, conf);
+        try (final AccumuloRyaQueryEngine queryEngine = new 
AccumuloRyaQueryEngine(accumulo, conf);
                 CloseableIterable<RyaStatement> queryIterable = 
queryEngine.query(new BatchRyaQuery(queryBatch))) {
             final Set<RyaStatement> triplesBatch = new HashSet<>();
 
             // Insert batches of the binding sets into Fluo.
-            for(final RyaStatement ryaStatement : queryIterable) {
+            for (final RyaStatement ryaStatement : queryIterable) {
                 if (triplesBatch.size() == spInsertBatchSize) {
                     writeBatch(fluo, triplesBatch);
                     triplesBatch.clear();
@@ -271,14 +395,6 @@ public class CreatePcj {
         } catch (final IOException e) {
             log.warn("Ignoring IOException thrown while closing the 
AccumuloRyaQueryEngine used by CreatePCJ.", e);
         }
-        
-        //return queryId to the caller for later monitoring from the export
-        if(fluoQuery.getConstructQueryMetadata().isPresent()) {
-            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
-        } 
-        
-        return fluoQuery.getQueryMetadata().get().getNodeId();
-        
     }
 
     private static void writeBatch(final FluoClient fluo, final 
Set<RyaStatement> batch) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
index 87eb9cc..3052c1d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
@@ -39,6 +39,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.openrdf.query.BindingSet;
 
@@ -50,12 +51,12 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * <p>
  * This is a two phase process.
  * <ol>
- *   <li>Delete metadata about each node of the query using a single Fluo
- *       transaction. This prevents new {@link BindingSet}s from being created 
when
- *       new triples are inserted.</li>
- *   <li>Delete BindingSets associated with each node of the query. This is 
done
- *       in a batch fashion to guard against large delete transactions that 
don't fit
- *       into memory.</li>
+ * <li>Delete metadata about each node of the query using a single Fluo
+ * transaction. This prevents new {@link BindingSet}s from being created when
+ * new triples are inserted.</li>
+ * <li>Delete BindingSets associated with each node of the query. This is done
+ * in a batch fashion to guard against large delete transactions that don't fit
+ * into memory.</li>
  * </ol>
  */
 @DefaultAnnotation(NonNull.class)
@@ -79,8 +80,10 @@ public class DeletePcj {
      * Precomputed Join Index from the Fluo application that is incrementally
      * updating it.
      *
-     * @param client - Connects to the Fluo application that is updating the 
PCJ Index. (not null)
-     * @param pcjId - The PCJ ID for the query that will removed from the Fluo 
application. (not null)
+     * @param client - Connects to the Fluo application that is updating the 
PCJ
+     *            Index. (not null)
+     * @param pcjId - The PCJ ID for the query that will removed from the Fluo
+     *            application. (not null)
      */
     public void deletePcj(final FluoClient client, final String pcjId) {
         requireNonNull(client);
@@ -167,6 +170,12 @@ public class DeletePcj {
                 nodeIds.add(aggChild);
                 getChildNodeIds(tx, aggChild, nodeIds);
                 break;
+            case PERIODIC_QUERY:
+                final PeriodicQueryMetadata periodicMeta = 
dao.readPeriodicQueryMetadata(tx, nodeId);
+                final String periodicChild = periodicMeta.getChildNodeId();
+                nodeIds.add(periodicChild);
+                getChildNodeIds(tx, periodicChild, nodeIds);
+                break;
             case STATEMENT_PATTERN:
                 break;
         }
@@ -215,10 +224,9 @@ public class DeletePcj {
         }
     }
 
-
     /**
-     * Deletes high level query meta for converting from queryId to pcjId and 
vice
-     * versa, as well as converting from sparql to queryId.
+     * Deletes high level query meta for converting from queryId to pcjId and
+     * vice versa, as well as converting from sparql to queryId.
      *
      * @param tx - Transaction the deletes will be performed with. (not null)
      * @param pcjId - The PCJ whose metadata will be deleted. (not null)
@@ -234,7 +242,6 @@ public class DeletePcj {
         tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
     }
 
-
     /**
      * Deletes all results (BindingSets or Statements) associated with the 
specified nodeId.
      *
@@ -265,18 +272,18 @@ public class DeletePcj {
         requireNonNull(scanner);
         requireNonNull(column);
 
-        try(Transaction ntx = tx) {
-          int count = 0;
-          final Iterator<RowColumnValue> iter = scanner.iterator();
-          while (iter.hasNext() && count < batchSize) {
-            final Bytes row = iter.next().getRow();
-            count++;
-            tx.delete(row, column);
-          }
+        try (Transaction ntx = tx) {
+            int count = 0;
+            final Iterator<RowColumnValue> iter = scanner.iterator();
+            while (iter.hasNext() && count < batchSize) {
+                final Bytes row = iter.next().getRow();
+                count++;
+                tx.delete(row, column);
+            }
 
-          final boolean hasNext = iter.hasNext();
-          tx.commit();
-          return hasNext;
+            final boolean hasNext = iter.hasNext();
+            tx.commit();
+            return hasNext;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 38fff95..b151c0e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,35 +1,28 @@
 <?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       you under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
-  http://www.apache.org/licenses/LICENSE-2.0
+       <parent>
+               <groupId>org.apache.rya</groupId>
+               <artifactId>rya.pcj.fluo.parent</artifactId>
+               <version>3.2.11-incubating-SNAPSHOT</version>
+       </parent>
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>rya.pcj.fluo.app</artifactId>
 
-    <parent>
-        <groupId>org.apache.rya</groupId>
-        <artifactId>rya.pcj.fluo.parent</artifactId>
-        <version>3.2.11-incubating-SNAPSHOT</version>
-    </parent>
-    
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>rya.pcj.fluo.app</artifactId>
-    
-    <name>Apache Rya PCJ Fluo App</name>
-    <description>
+       <name>Apache Rya PCJ Fluo App</name>
+       <description>
         A Fluo implementation of Rya Precomputed Join Indexing. This module 
produces
         a jar that may be executed by the 'fluo' command line tool as a YARN 
job.
     </description>
@@ -72,6 +65,10 @@ under the License.
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-recipes-accumulo</artifactId>
         </dependency>
+        <dependency>
+                       <groupId>org.openrdf.sesame</groupId>
+                       <artifactId>sesame-queryrender</artifactId>
+        </dependency>
         
         <dependency>
           <groupId>org.apache.kafka</groupId>
@@ -123,27 +120,29 @@ under the License.
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <!-- Use the pre-build 'jar-with-dependencies' assembly to package 
the dependent class files into the final jar. 
-                 This creates a jar file that can be deployed to Fluo without 
having to include any dependent jars. -->
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+
+       <build>
+               <plugins>
+                       <!-- Use the pre-build 'jar-with-dependencies' assembly 
to package the 
+                               dependent class files into the final jar. This 
creates a jar file that can 
+                               be deployed to Fluo without having to include 
any dependent jars. -->
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
deleted file mode 100644
index ae976ee..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Optional;
-
-/**
- * Searches a SPARQL query for {@link Filter}s.
- */
-@DefaultAnnotation(NonNull.class)
-class FilterFinder {
-
-    /**
-     * Search a SPARQL query for the {@link Filter} that appears at the
-     * {@code indexWithinQuery}'th time within the query.
-     * <p>
-     * The top most filter within the query will be at index 0, the next filter
-     * encountered will be at index 1, ... and the last index that is 
encountered
-     * will be at index <i>n</i>.
-     *
-     * @param sparql - The SPARQL query that to parse. (not null)
-     * @param indexWithinQuery - The index of the filter to fetch. (not null)
-     * @return The filter that was found within the query at the specified 
index;
-     *   otherwise absent.
-     * @throws Exception Thrown when the query could not be parsed or iterated 
over.
-     */
-    public Optional<Filter> findFilter(final String sparql, final int 
indexWithinQuery) throws Exception {
-        checkNotNull(sparql);
-        checkArgument(indexWithinQuery >= 0);
-
-        // When a filter is encountered for the requested index, store it in 
atomic reference and quit searching.
-        final AtomicReference<Filter> filterRef = new AtomicReference<>();
-        final QueryModelVisitorBase<RuntimeException> filterFinder = new 
QueryModelVisitorBase<RuntimeException>() {
-            private int i = 0;
-            @Override
-            public void meet(final Filter filter) {
-                // Store and stop searching.
-                if(i == indexWithinQuery) {
-                    filterRef.set(filter);
-                    return;
-                }
-
-                // Continue to the next filter.
-                i++;
-                super.meet(filter);
-            }
-        };
-
-        // Parse the query and find the filter.
-        final SPARQLParser parser = new SPARQLParser();
-        final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
-        parsedQuery.getTupleExpr().visit(filterFinder);
-        return Optional.fromNullable(filterRef.get());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 42ec686..1c99051 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -26,9 +26,11 @@ import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
@@ -62,11 +64,6 @@ public class FilterResultUpdater {
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     /**
-     * A utility class used to search SPARQL queries for Filters.
-     */
-    private static final FilterFinder filterFinder = new FilterFinder();
-
-    /**
      * Is used to evaluate the conditions of a {@link Filter}.
      */
     private static final EvaluationStrategyImpl evaluator = new 
EvaluationStrategyImpl(
@@ -111,12 +108,11 @@ public class FilterResultUpdater {
                 "Binding Set:\n" + childBindingSet + "\n");
 
         // Parse the original query and find the Filter that represents 
filterId.
-        final String sparql = filterMetadata.getOriginalSparql();
-        final int indexWithinQuery = 
filterMetadata.getFilterIndexWithinSparql();
-        final Optional<Filter> filter = filterFinder.findFilter(sparql, 
indexWithinQuery);
+        final String sparql = filterMetadata.getFilterSparql();
+        Filter filter = FilterSerializer.deserialize(sparql);
 
         // Evaluate whether the child BindingSet satisfies the filter's 
condition.
-        final ValueExpr condition = filter.get().getCondition();
+        final ValueExpr condition = filter.getCondition();
         if (isTrue(condition, childBindingSet)) {
             // Create the Filter's binding set from the child's.
             final VariableOrder filterVarOrder = 
filterMetadata.getVariableOrder();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index f9d14b5..2084907 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -18,6 +18,8 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app;
 
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
 public class IncrementalUpdateConstants {
 
     // String constants used to create more easily parsed patterns.
@@ -34,6 +36,9 @@ public class IncrementalUpdateConstants {
     public static final String AGGREGATION_PREFIX = "AGGREGATION";
     public static final String QUERY_PREFIX = "QUERY";
     public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
+    public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
+    
+    public static final String PERIODIC_BIN_ID = 
PeriodicQueryResultStorage.PeriodicBinId;
 
     public static final String URI_TYPE = 
"http://www.w3.org/2001/XMLSchema#anyURI";;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 2cb5a54..9b65b34 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -43,6 +43,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index b829b7e..b8fc2d9 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -25,6 +25,7 @@ import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FI
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
 
 import java.util.List;
 
@@ -38,6 +39,7 @@ import com.google.common.base.Optional;
  * Represents the different types of nodes that a Query may have.
  */
 public enum NodeType {
+    PERIODIC_QUERY(QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, 
FluoQueryColumns.PERIODIC_QUERY_BINDING_SET),
     FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, 
FluoQueryColumns.FILTER_BINDING_SET),
     JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, 
FluoQueryColumns.JOIN_BINDING_SET),
     STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
@@ -101,6 +103,8 @@ public enum NodeType {
             type = AGGREGATION;
         } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) {
             type = CONSTRUCT;
+        } else if(nodeId.startsWith(PERIODIC_QUERY_PREFIX)) {
+            type = PERIODIC_QUERY;
         }
 
         return Optional.fromNullable(type);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
new file mode 100644
index 0000000..ae4912b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.Binding;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+/**
+ * This class adds the appropriate BinId Binding to each BindingSet that it 
processes.  The BinIds
+ * are used to determine which period a BindingSet (with a temporal Binding) 
falls into so that
+ * a user can receive periodic updates for a registered query. 
+ *
+ */
+public class PeriodicQueryUpdater {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicQueryUpdater.class);
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
+
+    /**
+     * Uses the {@link PeriodicQueryMetadata} to create a collection of binned 
BindingSets
+     * that are added to Fluo.  Each binned BindingSet is the original 
BindingSet with an additional
+     * Binding that contains the periodic bin id of the BindingSet.
+     * @param tx - Fluo Transaction
+     * @param bs - VisibilityBindingSet that will be binned
+     * @param metadata - PeriodicQueryMetadata used to bin BindingSets
+     * @throws Exception
+     */
+    public void updatePeriodicBinResults(TransactionBase tx, 
VisibilityBindingSet bs, PeriodicQueryMetadata metadata) throws Exception {
+        Set<Long> binIds = getBinEndTimes(metadata, bs);
+        for(Long id: binIds) {
+            //create binding set value bytes
+            QueryBindingSet binnedBs = new QueryBindingSet(bs);
+            binnedBs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(id));
+            VisibilityBindingSet visibilityBindingSet = new 
VisibilityBindingSet(binnedBs, bs.getVisibility());
+            Bytes periodicBsBytes = BS_SERDE.serialize(visibilityBindingSet);
+            
+            //create row 
+            final Bytes resultRow = 
RowKeyUtil.makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), 
binnedBs);
+            Column col = FluoQueryColumns.PERIODIC_QUERY_BINDING_SET;
+            tx.set(resultRow, col, periodicBsBytes);
+        }
+    }
+
+    /**
+     * This method returns the end times of all period windows containing the 
time contained in
+     * the BindingSet.  
+     * 
+     * @param metadata
+     * @return Set of period bin end times
+     */
+    private Set<Long> getBinEndTimes(PeriodicQueryMetadata metadata, 
VisibilityBindingSet bs) {
+        Set<Long> binIds = new HashSet<>();
+        try {
+            String timeVar = metadata.getTemporalVariable();
+            Value value = bs.getBinding(timeVar).getValue();
+            Literal temporalLiteral = (Literal) value;
+            long eventDateTime = 
temporalLiteral.calendarValue().toGregorianCalendar().getTimeInMillis();
+            return getEndTimes(eventDateTime, metadata.getWindowSize(), 
metadata.getPeriod());
+        } catch (Exception e) {
+            log.trace("Unable to extract the entity time from BindingSet: " + 
bs);
+        }
+        return binIds;
+    }
+
+    private long getRightBinEndPoint(long eventDateTime, long periodDuration) {
+        return (eventDateTime / periodDuration + 1) * periodDuration;
+    }
+    
+    private long getLeftBinEndPoint(long eventTime, long periodDuration) {
+        return (eventTime / periodDuration) * periodDuration;
+    }
+
+    /**
+     * Using the smallest period end time, this method also creates all other 
period end times
+     * that occur within one windowSize of the eventDateTime.
+     * @param eventDateTime
+     * @param startTime
+     * @param windowDuration
+     * @param periodDuration
+     * @return Set of period bin end times
+     */
+    private Set<Long> getEndTimes(long eventDateTime, long windowDuration, 
long periodDuration) {
+        Set<Long> binIds = new HashSet<>();
+        long rightEventBin = getRightBinEndPoint(eventDateTime, 
periodDuration);
+        //get the bin left of the current moment for comparison
+        long currentBin = getLeftBinEndPoint(System.currentTimeMillis(), 
periodDuration);
+        
+        if(currentBin >= rightEventBin) {
+            long numBins = (windowDuration -(currentBin - 
rightEventBin))/periodDuration;
+            for(int i = 0; i < numBins; i++) {
+                binIds.add(currentBin + i*periodDuration);
+            }
+        } else {
+            //this corresponds to a future event that is inserted into the 
system
+            long numBins = windowDuration/periodDuration;
+            for(int i = 0; i < numBins; i++) {
+                binIds.add(rightEventBin + i*periodDuration);
+            }
+        }
+
+        return binIds;
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index ba82726..44fc9bd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -31,6 +31,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.query.BindingSet;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -88,7 +89,7 @@ public class QueryResultUpdater {
         }
 
         // Create the Binding Set that goes in the Node Value. It does contain 
visibilities.
-        final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
+        final Bytes nodeValueBytes = BS_SERDE.serialize(new 
VisibilityBindingSet(queryBindingSet,childBindingSet.getVisibility()));
 
         log.trace(
                 "Transaction ID: " + tx.getStartTimestamp() + "\n" +

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
deleted file mode 100644
index 34439e4..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Serializes and deserializes a {@link VisibilityBindingSet} to and from 
{@link Bytes} objects.
- */
-@DefaultAnnotation(NonNull.class)
-public class VisibilityBindingSetSerDe {
-
-    /**
-     * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object.
-     *
-     * @param bindingSet - The binding set that will be serialized. (not null)
-     * @return The serialized object.
-     * @throws Exception A problem was encountered while serializing the 
object.
-     */
-    public Bytes serialize(final VisibilityBindingSet bindingSet) throws 
Exception {
-        requireNonNull(bindingSet);
-
-        final ByteArrayOutputStream boas = new ByteArrayOutputStream();
-        try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) {
-            oos.writeObject(bindingSet);
-        }
-
-        return Bytes.of(boas.toByteArray());
-    }
-
-    /**
-     * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object.
-     *
-     * @param bytes - The bytes that will be deserialized. (not null)
-     * @return The deserialized object.
-     * @throws Exception A problem was encountered while deserializing the 
object.
-     */
-    public VisibilityBindingSet deserialize(final Bytes bytes) throws 
Exception {
-        requireNonNull(bytes);
-
-        try(final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bytes.toArray()))) {
-            final Object o = ois.readObject();
-            if(o instanceof VisibilityBindingSet) {
-                return (VisibilityBindingSet) o;
-            } else {
-                throw new Exception("Deserialized Object is not a 
VisibilityBindingSet. Was: " + o.getClass());
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
new file mode 100644
index 0000000..db33d3b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+/**
+ * This class provides common functionality for implementations of {@link 
BatchBindingSetUpdater}.
+ *
+ */
+public abstract class AbstractBatchBindingSetUpdater implements 
BatchBindingSetUpdater {
+
+    /**
+     * Updates the Span to create a new {@link BatchInformation} object to be 
fed to the
+     * {@link BatchObserver}.  This message is called in the event that the 
BatchBindingSetUpdater
+     * reaches the batch size before processing all entries relevant to its 
Span.
+     * @param newStart - new start to the Span
+     * @param oldSpan - old Span to be updated with newStart
+     * @return - updated Span used with an updated BatchInformation object to 
complete the batch task
+     */
+    public Span getNewSpan(RowColumn newStart, Span oldSpan) {
+        return new Span(newStart, oldSpan.isStartInclusive(), 
oldSpan.getEnd(), oldSpan.isEndInclusive());
+    }
+    
+    /**
+     * Cleans up old batch job.  This method is meant to be called by any 
overriding method
+     * to clean up old batch tasks.
+     */
+    @Override
+    public void processBatch(TransactionBase tx, Bytes row, BatchInformation 
batch) throws Exception {
+        tx.delete(row, FluoQueryColumns.BATCH_COLUMN);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
new file mode 100644
index 0000000..498dd85
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+import jline.internal.Preconditions;
+
+/**
+ * Abstract class for generating span based notifications.  A spanned 
notification
+ * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
+ *
+ */
+public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation {
+
+    private Span span;
+
+    /**
+     * Create AbstractBatchInformation
+     * @param batchSize - size of batch to be processed
+     * @param task - type of task processed (Add, Delete, Udpate)
+     * @param column - Cpolumn that Span notification is applied
+     * @param span - span used to indicate where processing should begin
+     */
+    public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
+        super(batchSize, task, column);
+        this.span = Preconditions.checkNotNull(span);
+    }
+
+    public AbstractSpanBatchInformation(Task task, Column column, Span span) {
+        this(DEFAULT_BATCH_SIZE, task, column, span);
+    }
+
+    /**
+     * @return Span that batch Task will be applied to
+     */
+    public Span getSpan() {
+        return span;
+    }
+
+    /**
+     * Sets span to which batch Task will be applied
+     * @param span
+     */
+    public void setSpan(Span span) {
+        this.span = span;
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("Span Batch Information {\n")
+                .append("    Span: " + span + "\n")
+                .append("    Batch Size: " + super.getBatchSize() + "\n")
+                .append("    Task: " + super.getTask() + "\n")
+                .append("    Column: " + super.getColumn() + "\n")
+                .append("}")
+                .toString();
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof AbstractSpanBatchInformation)) {
+            return false;
+        }
+
+        AbstractSpanBatchInformation batch = (AbstractSpanBatchInformation) 
other;
+        return (super.getBatchSize() == batch.getBatchSize()) && 
Objects.equals(super.getColumn(), batch.getColumn()) && 
Objects.equals(this.span, batch.span)
+                && Objects.equals(super.getTask(), batch.getTask());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.getBatchSize(), span, super.getColumn(), 
super.getTask());
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
new file mode 100644
index 0000000..288ed6e
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
@@ -0,0 +1,81 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class contains all of the common info contained in other 
implementations
+ * of BatchInformation.
+ *
+ */
+public abstract class BasicBatchInformation implements BatchInformation {
+    
+    private int batchSize;
+    private Task task;
+    private Column column;
+    
+    /**
+     * Create BasicBatchInformation object
+     * @param batchSize - size of batch to be processed
+     * @param task - task to be processed
+     * @param column - Column in which data is proessed
+     */
+    public BasicBatchInformation(int batchSize, Task task, Column column ) {
+        this.task = Preconditions.checkNotNull(task);
+        this.column = Preconditions.checkNotNull(column);
+        Preconditions.checkArgument(batchSize > 0);
+        this.batchSize = batchSize;
+    }
+    
+    /**
+     * Creates a BasicBatchInformation 
+     * @param task
+     */
+    public BasicBatchInformation(Task task, Column column) {
+        Preconditions.checkNotNull(task);
+        Preconditions.checkNotNull(column);
+        this.task = task;
+        this.column = column;
+        this.batchSize = DEFAULT_BATCH_SIZE;
+    }
+
+    /**
+     * @return - size of batch that tasks are performed in
+     */
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * @return - type of Task performed (Add, Delete, Update)
+     */
+    public Task getTask() {
+        return task;
+    }
+    
+    /**
+     * @return - Column in which Task will be performed
+     */
+    public Column getColumn() {
+        return column;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java
new file mode 100644
index 0000000..2076d2d
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java
@@ -0,0 +1,43 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Interface for applying batch updates to the Fluo table based on the 
provided {@link BatchInformation}.
+ * This updater is used by the {@link BatchObserver} to apply batch updates to 
overcome the restriction
+ * that all transactions are processed in memory.  This allows Observers 
process potentially large 
+ * tasks that cannot fit into memory in a piece-wise, batch fashion.
+ */
+public interface BatchBindingSetUpdater {
+
+    /**
+     * Processes the {@link BatchInformation} object.  The BatchInformation 
will
+     * typically include a Task (either Add, Update, or Delete), along with 
information
+     * about the starting point to begin processing data.
+     * @param tx - Fluo Transaction
+     * @param row - contains the ID of the Fluo node to be processed
+     * @param batch - contains info about which cells for the Fluo query 
result node
+     * should be processed
+     * @throws Exception
+     */
+    public void processBatch(TransactionBase tx, Bytes row, BatchInformation 
batch) throws Exception;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java
new file mode 100644
index 0000000..7b23ee7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java
@@ -0,0 +1,57 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+
+/**
+ * Interface for submitting batch Fluo tasks to be processed by the
+ * {@link BatchObserver}. The BatchObserver applies batch updates to overcome
+ * the restriction that all Fluo transactions are processed in memory. This
+ * allows the Rya Fluo application to process large tasks that cannot fit into
+ * memory in a piece-wise, batch fashion.
+ */
+public interface BatchInformation {
+
+    public static enum Task {Add, Delete, Update}
+    public static int DEFAULT_BATCH_SIZE = 5000;
+    
+    /**
+     * @return batchsize of task 
+     */
+    public int getBatchSize();
+    
+    /**
+     *
+     * @return Task to be performed
+     */
+    public Task getTask();
+    
+    /**
+     * 
+     * @return Column that task will be performed on
+     */
+    public Column getColumn();
+    
+    /**
+     * 
+     * @return BatchBindingSetUpdater used to process this Batch Task
+     */
+    public BatchBindingSetUpdater getBatchUpdater();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java
new file mode 100644
index 0000000..f9ed658
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java
@@ -0,0 +1,59 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import 
org.apache.rya.indexing.pcj.fluo.app.batch.serializer.BatchInformationSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+/**
+ * Class used for reading and writing {@link BatchInformation}.
+ *
+ */
+public class BatchInformationDAO {
+    
+    /**
+     * Adds BatchInformation to the {@link FluoQueryColumns#BATCH_COLUMN}.
+     * @param tx - Fluo Transaction
+     * @param nodeId - query node that batch task will be performed on
+     * @param batch - BatchInformation to be processed
+     */
+    public static void addBatch(TransactionBase tx, String nodeId, 
BatchInformation batch) {
+        Bytes row = BatchRowKeyUtil.getRow(nodeId);
+        tx.set(row, FluoQueryColumns.BATCH_COLUMN, 
Bytes.of(BatchInformationSerializer.toBytes(batch)));
+    }
+    
+    /**
+     * Retrieve BatchInformation
+     * @param tx - Fluo transaction
+     * @param row - row that contains batch information - this is the query id 
that batch task will be performed on
+     * @return Optional contained the BatchInformation if it is there
+     */
+    public static Optional<BatchInformation> 
getBatchInformation(TransactionBase tx, Bytes row) {
+        Bytes val = tx.get(row, FluoQueryColumns.BATCH_COLUMN);
+        if(val != null) {
+            return BatchInformationSerializer.fromBytes(val.toArray());
+        } else {
+            return Optional.empty();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java
new file mode 100644
index 0000000..6194236
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java
@@ -0,0 +1,63 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+/**
+ * BatchObserver processes tasks that need to be broken into batches. Entries
+ * stored stored in this {@link FluoQueryColumns#BATCH_COLUMN} are of the form
+ * Row: nodeId, Value: BatchInformation. The nodeId indicates the node that the
+ * batch operation will be performed on. All batch operations are performed on
+ * the bindingSet column for the {@link NodeType} corresponding to the given
+ * nodeId. For example, if the nodeId indicated that the NodeType was
+ * StatementPattern, then the batch operation would be performed on
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_BINDING_SET}. This Observer 
applies
+ * batch updates to overcome the restriction that all Fluo transactions are 
processed
+ * in memory. This allows the Rya Fluo application to process large tasks that 
cannot
+ * fit into memory in a piece-wise, batch fashion.
+ */
+public class BatchObserver extends AbstractObserver {
+
+    /**
+     * Processes the BatchInformation objects when they're written to the 
Batch column
+     * @param tx - Fluo transaction 
+     * @param row - row that contains {@link BatchInformation}
+     * @param col - column that contains BatchInformation
+     */
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) throws 
Exception {
+        Optional<BatchInformation> batchInfo = 
BatchInformationDAO.getBatchInformation(tx, row);
+        if(batchInfo.isPresent()) {
+            batchInfo.get().getBatchUpdater().processBatch(tx, row, 
batchInfo.get());
+        }
+    }
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.BATCH_COLUMN, 
NotificationType.STRONG);
+    }
+
+}


Reply via email to