http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index a701052..8d218af 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -18,6 +18,7 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import java.util.Collection;
@@ -29,7 +30,6 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -46,6 +46,7 @@ public class FluoQuery {
 
     private final Optional<QueryMetadata> queryMetadata;
     private final Optional<ConstructQueryMetadata> constructMetadata;
+    private final Optional<PeriodicQueryMetadata> periodicQueryMetadata;
     private final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata;
     private final ImmutableMap<String, FilterMetadata> filterMetadata;
     private final ImmutableMap<String, JoinMetadata> joinMetadata;
@@ -58,6 +59,7 @@ public class FluoQuery {
      * must use {@link Builder} instead.
      *
      * @param queryMetadata - The root node of a query that is updated in 
Fluo. (not null)
+     * @param periodicQueryMetadata - The periodic query node that is updated 
in Fluo.
      * @param statementPatternMetadata - A map from Node ID to Statement 
Pattern metadata as
      *   it is represented within the Fluo app. (not null)
      * @param filterMetadata - A map from Node ID to Filter metadata as it is 
represented
@@ -69,6 +71,7 @@ public class FluoQuery {
      */
     private FluoQuery(
             final QueryMetadata queryMetadata,
+            final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
             final ImmutableMap<String, JoinMetadata> joinMetadata, 
@@ -76,6 +79,7 @@ public class FluoQuery {
                 this.aggregationMetadata = requireNonNull(aggregationMetadata);
         this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
         this.constructMetadata = Optional.absent();
+        this.periodicQueryMetadata = periodicQueryMetadata;
         this.statementPatternMetadata = 
requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
@@ -88,23 +92,26 @@ public class FluoQuery {
      * must use {@link Builder} instead.
      *
      * @param constructMetadata - The root node of a query that is updated in 
Fluo. (not null)
+     * @param periodicQueryMetadata - The periodic query node that is updated 
in Fluo.
      * @param statementPatternMetadata - A map from Node ID to Statement 
Pattern metadata as
      *   it is represented within the Fluo app. (not null)
      * @param filterMetadata A map from Node ID to Filter metadata as it is 
represented
      *   within the Fluo app. (not null)
-     * @param joinMetadata A map from Node ID to Join metadata as it is 
represented
+     * @param joinMetadata - A map from Node ID to Join metadata as it is 
represented
      *   within the Fluo app. (not null)
      * @param aggregationMetadata - A map from Node ID to Aggregation metadata 
as it is
      *   represented within the Fluo app. (not null)
      */
     private FluoQuery(
             final ConstructQueryMetadata constructMetadata,
+            final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
             final ImmutableMap<String, JoinMetadata> joinMetadata,
             final ImmutableMap<String, AggregationMetadata> 
aggregationMetadata) {
         this.constructMetadata = 
Optional.of(requireNonNull(constructMetadata));
         this.queryMetadata = Optional.absent();
+        this.periodicQueryMetadata = periodicQueryMetadata;
         this.statementPatternMetadata = 
requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
@@ -130,6 +137,13 @@ public class FluoQuery {
     public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
         return constructMetadata;
     }
+    
+    /**
+     * @return All of the Periodic Query metadata that is stored for the query.
+     */
+    public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() {
+        return periodicQueryMetadata;
+    }
 
     /**
      * Get a Statement Pattern node's metadata.
@@ -207,6 +221,7 @@ public class FluoQuery {
     public int hashCode() {
         return Objects.hashCode(
                 queryMetadata,
+                periodicQueryMetadata,
                 statementPatternMetadata,
                 filterMetadata,
                 joinMetadata,
@@ -224,6 +239,7 @@ public class FluoQuery {
             return new EqualsBuilder()
                     .append(queryMetadata, fluoQuery.queryMetadata)
                     .append(constructMetadata,  fluoQuery.constructMetadata)
+                    .append(periodicQueryMetadata, 
fluoQuery.periodicQueryMetadata)
                     .append(statementPatternMetadata, 
fluoQuery.statementPatternMetadata)
                     .append(filterMetadata, fluoQuery.filterMetadata)
                     .append(joinMetadata, fluoQuery.joinMetadata)
@@ -247,6 +263,11 @@ public class FluoQuery {
             builder.append( constructMetadata.get().toString() );
             builder.append("\n");
         }
+        
+        if(periodicQueryMetadata.isPresent()) {
+            builder.append(periodicQueryMetadata.get());
+            builder.append("\n");
+        }
 
         for(final FilterMetadata metadata : filterMetadata.values()) {
             builder.append(metadata);
@@ -286,6 +307,7 @@ public class FluoQuery {
 
         private QueryMetadata.Builder queryBuilder = null;
         private ConstructQueryMetadata.Builder constructBuilder = null;
+        private PeriodicQueryMetadata.Builder periodicQueryBuilder = null;
         private final Map<String, StatementPatternMetadata.Builder> spBuilders 
= new HashMap<>();
         private final Map<String, FilterMetadata.Builder> filterBuilders = new 
HashMap<>();
         private final Map<String, JoinMetadata.Builder> joinBuilders = new 
HashMap<>();
@@ -388,6 +410,17 @@ public class FluoQuery {
         }
 
         /**
+         * Get a Join builder from this builder.
+         *
+         * @param nodeId - The Node ID the Join builder was stored at. (not 
null)
+         * @return The builder that was stored at the node id if one was found.
+         */
+        public Optional<JoinMetadata.Builder> getJoinBuilder(final String 
nodeId) {
+            requireNonNull(nodeId);
+            return Optional.fromNullable( joinBuilders.get(nodeId) );
+        }
+        
+        /**
          * Get an Aggregate builder from this builder.
          *
          * @param nodeId - The Node ID the Aggregate builder was stored at. 
(not null)
@@ -410,15 +443,28 @@ public class FluoQuery {
             return this;
         }
 
+        
+        
         /**
-         * Get a Join builder from this builder.
+         * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder.
          *
-         * @param nodeId - The Node ID the Join builder was stored at. (not 
null)
-         * @return The builder that was stored at the node id if one was found.
+         * @param periodicQueryBuilder - A builder representing a specific 
Join within the query. (not null)
+         * @return This builder so that method invocation may be chained.
          */
-        public Optional<JoinMetadata.Builder> getJoinBuilder(final String 
nodeId) {
-            requireNonNull(nodeId);
-            return Optional.fromNullable( joinBuilders.get(nodeId) );
+        public Builder addPeriodicQueryMetadata(final 
PeriodicQueryMetadata.Builder periodicQueryBuilder) {
+            requireNonNull(periodicQueryBuilder);
+            this.periodicQueryBuilder = periodicQueryBuilder;
+            return this;
+        }
+
+        
+        /**
+         * Get a PeriodicQuery builder from this builder.
+         *
+         * @return The PeriodicQuery builder if one has been set.
+         */
+        public Optional<PeriodicQueryMetadata.Builder> 
getPeriodicQueryBuilder() {
+            return Optional.fromNullable( periodicQueryBuilder);
         }
         
 
@@ -426,8 +472,19 @@ public class FluoQuery {
          * @return Creates a {@link FluoQuery} using the values that have been 
supplied to this builder.
          */
         public FluoQuery build() {
-            Preconditions.checkArgument(
-                    (queryBuilder != null && constructBuilder == null) || 
(queryBuilder == null && constructBuilder != null));
+            checkArgument((queryBuilder != null && constructBuilder == null) 
|| (queryBuilder == null && constructBuilder != null));
+            
+            Optional<QueryMetadata.Builder> optionalQueryBuilder = 
getQueryBuilder();
+            QueryMetadata queryMetadata = null;
+            if(optionalQueryBuilder.isPresent()) {
+                queryMetadata = optionalQueryBuilder.get().build();
+            }
+            
+            Optional<PeriodicQueryMetadata.Builder> 
optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
+            PeriodicQueryMetadata periodicQueryMetadata = null;
+            if(optionalPeriodicQueryBuilder.isPresent()) {
+                periodicQueryMetadata = 
optionalPeriodicQueryBuilder.get().build();
+            }
 
             final ImmutableMap.Builder<String, StatementPatternMetadata> 
spMetadata = ImmutableMap.builder();
             for(final Entry<String, StatementPatternMetadata.Builder> entry : 
spBuilders.entrySet()) {
@@ -450,11 +507,11 @@ public class FluoQuery {
             }
 
             if(queryBuilder != null) {
-                return new FluoQuery(queryBuilder.build(), spMetadata.build(), 
filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+                return new FluoQuery(queryBuilder.build(), 
Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), 
filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
             }
             //constructBuilder non-null in this case, but no need to check
             else {
-                return new FluoQuery(constructBuilder.build(), 
spMetadata.build(), filterMetadata.build(), joinMetadata.build(), 
aggregateMetadata.build());
+                return new FluoQuery(constructBuilder.build(), 
Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), 
filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
             }
             
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 3396114..ed18d49 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -63,14 +63,28 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:nodeId</td> <td>The Node ID of 
the Filter.</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:veriableOrder</td> <td>The 
Variable Order binding sets are emitted with.</td> </tr>
- *     <tr> <td>Node ID</td> <td>filterMetadata:originalSparql</td> <td>The 
original SPRAQL query this filter was derived from.</td> </tr>
- *     <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> 
<td>Indicates which filter within the original SPARQL query this 
represents.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>filterMetadata:filterSparql</td> <td>A SPARQL 
query representing this filter.</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node 
ID this filter emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
  *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>filterMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} 
object.</td> </tr>
  *   </table>
  * </p>
  * <p>
+ *   <b>Periodic Bin Metadata</b>
+ *   <table border="1" style="width:100%">
+ *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:nodeId</td> <td>The 
Node ID of the Filter.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:variableOrder</td> 
<td>The Variable Order binding sets are emitted with.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:period</td> <td>The 
period size used to form BindingSet bins.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:windowSize</td> <td>The 
window size used to form BindingSet bins.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:timeUnit</td> <td>The 
unit of time corresponding to period and window size.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:temporalVariable</td> 
<td>The BindingSet variable corresponding to event time.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:parentNodeId</td> 
<td>The parent node for this node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>periodicQueryMetadata:childNodeId</td> 
<td>The child node for this node.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding set String</td> 
<td>periodicQueryMetadata:bindingSet</td> <td>A binned BindingSet.</td> </tr>
+ *   </table>
+ * </p>
+ * <p>
  *   <b>Join Metadata</b>
  *   <table border="1" style="width:100%">
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
@@ -117,6 +131,7 @@ public class FluoQueryColumns {
     public static final String STATEMENT_PATTERN_METADATA_CF = 
"statementPatternMetadata";
     public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
     public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
+    public static final String PERIODIC_QUERY_METADATA_CF = 
"periodicQueryMetadata";
 
     /**
      * New triples that have been added to Rya are written as a row in this
@@ -174,13 +189,23 @@ public class FluoQueryColumns {
 
     // Filter Metadata columns.
     public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, 
"nodeId");
-    public static final Column FILTER_VARIABLE_ORDER = new 
Column(FILTER_METADATA_CF, "veriableOrder");
-    public static final Column FILTER_ORIGINAL_SPARQL = new 
Column(FILTER_METADATA_CF, "originalSparql");
-    public static final Column FILTER_INDEX_WITHIN_SPARQL = new 
Column(FILTER_METADATA_CF, "filterIndexWithinSparql");
+    public static final Column FILTER_VARIABLE_ORDER = new 
Column(FILTER_METADATA_CF, "variableOrder");
+    public static final Column FILTER_SPARQL = new Column(FILTER_METADATA_CF, 
"filterSparql");
     public static final Column FILTER_PARENT_NODE_ID = new 
Column(FILTER_METADATA_CF, "parentNodeId");
     public static final Column FILTER_CHILD_NODE_ID = new 
Column(FILTER_METADATA_CF, "childNodeId");
     public static final Column FILTER_BINDING_SET = new 
Column(FILTER_METADATA_CF, "bindingSet");
-
+    
+    // Periodic Bin Metadata columns.
+    public static final Column PERIODIC_QUERY_NODE_ID = new 
Column(PERIODIC_QUERY_METADATA_CF, "nodeId");
+    public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new 
Column(PERIODIC_QUERY_METADATA_CF, "variableOrder");
+    public static final Column PERIODIC_QUERY_PARENT_NODE_ID = new 
Column(PERIODIC_QUERY_METADATA_CF, "parentNodeId");
+    public static final Column PERIODIC_QUERY_CHILD_NODE_ID = new 
Column(PERIODIC_QUERY_METADATA_CF, "childNodeId");
+    public static final Column PERIODIC_QUERY_BINDING_SET = new 
Column(PERIODIC_QUERY_METADATA_CF, "bindingSet");
+    public static final Column PERIODIC_QUERY_PERIOD = new 
Column(PERIODIC_QUERY_METADATA_CF, "period");
+    public static final Column PERIODIC_QUERY_WINDOWSIZE = new 
Column(PERIODIC_QUERY_METADATA_CF, "windowSize");
+    public static final Column PERIODIC_QUERY_TIMEUNIT = new 
Column(PERIODIC_QUERY_METADATA_CF, "timeUnit");
+    public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new 
Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable");
+    
     // Join Metadata columns.
     public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, 
"nodeId");
     public static final Column JOIN_VARIABLE_ORDER = new 
Column(JOIN_METADATA_CF, "variableOrder");
@@ -207,6 +232,18 @@ public class FluoQueryColumns {
     public static final Column AGGREGATION_BINDING_SET = new 
Column(AGGREGATION_METADATA_CF, "bindingSet");
 
     /**
+     * BatchObserver column for processing tasks that need to be broken into
+     * batches. Entries stored stored in this column are of the form Row:
+     * nodeId, Value: BatchInformation. The nodeId indicates the node that the
+     * batch operation will be performed on. All batch operations are performed
+     * on the bindingSet column for the NodeType indicated by the given nodeId.
+     * For example, if the nodeId indicated that the NodeType was
+     * StatementPattern, then the batch operation would be performed on
+     * {@link FluoQueryColumns#STATEMENT_PATTERN_BINDING_SET}.
+     */
+    public static final Column BATCH_COLUMN = new 
Column("batch","information");
+
+    /**
      * Enumerates the {@link Column}s that hold all of the fields for each type
      * of node that can compose a query.
      */
@@ -220,6 +257,20 @@ public class FluoQueryColumns {
                         QUERY_VARIABLE_ORDER,
                         QUERY_SPARQL,
                         QUERY_CHILD_NODE_ID)),
+        
+        
+        /**
+         * The columns a {@link PeriodicBinMetadata} object's fields are 
stored within.
+         */
+        PERIODIC_QUERY_COLUMNS(
+                Arrays.asList(PERIODIC_QUERY_NODE_ID,
+                        PERIODIC_QUERY_VARIABLE_ORDER,
+                        PERIODIC_QUERY_PERIOD,
+                        PERIODIC_QUERY_WINDOWSIZE,
+                        PERIODIC_QUERY_TIMEUNIT,
+                        PERIODIC_QUERY_TEMPORAL_VARIABLE,
+                        PERIODIC_QUERY_PARENT_NODE_ID,
+                        PERIODIC_QUERY_CHILD_NODE_ID)),
 
         /**
          * The columns a {@link ConstructQueryMetadata} object's fields are 
stored within.
@@ -239,8 +290,7 @@ public class FluoQueryColumns {
         FILTER_COLUMNS(
                 Arrays.asList(FILTER_NODE_ID,
                         FILTER_VARIABLE_ORDER,
-                        FILTER_ORIGINAL_SPARQL,
-                        FILTER_INDEX_WITHIN_SPARQL,
+                        FILTER_SPARQL,
                         FILTER_PARENT_NODE_ID,
                         FILTER_CHILD_NODE_ID)),
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 5e9d654..8675b80 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -26,6 +26,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
@@ -40,6 +41,7 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -170,8 +172,7 @@ public class FluoQueryMetadataDAO {
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId);
         tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, 
metadata.getVariableOrder().toString());
-        tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, 
metadata.getOriginalSparql() );
-        tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, 
metadata.getFilterIndexWithinSparql()+"" );
+        tx.set(rowId, FluoQueryColumns.FILTER_SPARQL, 
metadata.getFilterSparql() );
         tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, 
metadata.getParentNodeId() );
         tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, 
metadata.getChildNodeId() );
     }
@@ -195,8 +196,7 @@ public class FluoQueryMetadataDAO {
         final String rowId = nodeId;
         final Map<Column, String> values = sx.gets(rowId,
                 FluoQueryColumns.FILTER_VARIABLE_ORDER,
-                FluoQueryColumns.FILTER_ORIGINAL_SPARQL,
-                FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL,
+                FluoQueryColumns.FILTER_SPARQL,
                 FluoQueryColumns.FILTER_PARENT_NODE_ID,
                 FluoQueryColumns.FILTER_CHILD_NODE_ID);
 
@@ -204,18 +204,88 @@ public class FluoQueryMetadataDAO {
         final String varOrderString = 
values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER);
         final VariableOrder varOrder = new VariableOrder(varOrderString);
 
-        final String originalSparql = 
values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL);
-        final int filterIndexWithinSparql = 
Integer.parseInt(values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL));
+        final String originalSparql = 
values.get(FluoQueryColumns.FILTER_SPARQL);
         final String parentNodeId = 
values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID);
         final String childNodeId = 
values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID);
 
-        return FilterMetadata.builder(nodeId)
+        return 
FilterMetadata.builder(nodeId).setVarOrder(varOrder).setFilterSparql(originalSparql)
+                .setParentNodeId(parentNodeId).setChildNodeId(childNodeId);
+    }
+
+    /**
+     * Write an instance of {@link PeriodicQueryMetadata} to the Fluo table.
+     *
+     * @param tx
+     *            - The transaction that will be used to commit the metadata.
+     *            (not null)
+     * @param metadata
+     *            - The PeriodicBin node metadata that will be written to the
+     *            table. (not null)
+     */
+    public void write(final TransactionBase tx, final PeriodicQueryMetadata 
metadata) {
+        requireNonNull(tx);
+        requireNonNull(metadata);
+
+        final String rowId = metadata.getNodeId();
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_NODE_ID, rowId);
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER, 
metadata.getVariableOrder().toString());
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, 
metadata.getParentNodeId());
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID, 
metadata.getChildNodeId());
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PERIOD, 
Long.toString(metadata.getPeriod()));
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE, 
Long.toString(metadata.getWindowSize()));
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, 
metadata.getUnit().name());
+        tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE, 
metadata.getTemporalVariable());
+    }
+
+    /**
+     * Read an instance of {@link PeriodicQueryMetadata} from the Fluo table.
+     *
+     * @param sx
+     *            - The snapshot that will be used to read the metadata. (not
+     *            null)
+     * @param nodeId
+     *            - The nodeId of the PeriodicBin node that will be read. (not
+     *            null)
+     * @return The {@link PeriodicQueryMetadata} that was read from table.
+     */
+    public PeriodicQueryMetadata readPeriodicQueryMetadata(final SnapshotBase 
sx, final String nodeId) {
+        return readPeriodicQueryMetadataBuilder(sx, nodeId).build();
+    }
+
+    private PeriodicQueryMetadata.Builder 
readPeriodicQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+        requireNonNull(sx);
+        requireNonNull(nodeId);
+
+        // Fetch the values from the Fluo table.
+        final String rowId = nodeId;
+        final Map<Column, String> values = sx.gets(rowId, 
FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER,
+                FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, 
FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID,
+                FluoQueryColumns.PERIODIC_QUERY_PERIOD, 
FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE,
+                FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, 
FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE);
+
+        // Return an object holding them.
+        final String varOrderString = 
values.get(FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER);
+        final VariableOrder varOrder = new VariableOrder(varOrderString);
+        final String parentNodeId = 
values.get(FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID);
+        final String childNodeId = 
values.get(FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID);
+        final String temporalVariable = 
values.get(FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE);
+        final String period = 
values.get(FluoQueryColumns.PERIODIC_QUERY_PERIOD);
+        final String window = 
values.get(FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE);
+        final String timeUnit = 
values.get(FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT);
+
+        return PeriodicQueryMetadata.builder()
+                .setNodeId(nodeId)
                 .setVarOrder(varOrder)
-                .setOriginalSparql(originalSparql)
-                .setFilterIndexWithinSparql(filterIndexWithinSparql)
                 .setParentNodeId(parentNodeId)
-                .setChildNodeId(childNodeId);
+                .setChildNodeId(childNodeId)
+                .setWindowSize(Long.parseLong(window))
+                .setPeriod(Long.parseLong(period))
+                .setTemporalVariable(temporalVariable)
+                .setUnit(TimeUnit.valueOf(timeUnit));
+
     }
+    
+    
 
     /**
      * Write an instance of {@link JoinMetadata} to the Fluo table.
@@ -325,12 +395,10 @@ public class FluoQueryMetadataDAO {
         final String pattern = 
values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN);
         final String parentNodeId = 
values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID);
 
-        return StatementPatternMetadata.builder(nodeId)
-                .setVarOrder(varOrder)
-                .setStatementPattern(pattern)
-                .setParentNodeId(parentNodeId);
+        return 
StatementPatternMetadata.builder(nodeId).setVarOrder(varOrder).setStatementPattern(pattern).setParentNodeId(parentNodeId);
     }
 
+
     /**
      * Write an instance of {@link AggregationMetadata} to the Fluo table.
      *
@@ -432,10 +500,11 @@ public class FluoQueryMetadataDAO {
         requireNonNull(query);
 
         // Write the rest of the metadata objects.
-        switch(query.getQueryType()) {
+        switch (query.getQueryType()) {
         case Construct:
             ConstructQueryMetadata constructMetadata = 
query.getConstructQueryMetadata().get();
-            // Store the Query ID so that it may be looked up from the 
original SPARQL string.
+            // Store the Query ID so that it may be looked up from the original
+            // SPARQL string.
             final String constructSparql = constructMetadata.getSparql();
             final String constructQueryId = constructMetadata.getNodeId();
             tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, 
Bytes.of(constructQueryId));
@@ -443,13 +512,19 @@ public class FluoQueryMetadataDAO {
             break;
         case Projection:
             QueryMetadata queryMetadata = query.getQueryMetadata().get();
-            // Store the Query ID so that it may be looked up from the 
original SPARQL string.
+            // Store the Query ID so that it may be looked up from the original
+            // SPARQL string.
             final String sparql = queryMetadata.getSparql();
             final String queryId = queryMetadata.getNodeId();
             tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, 
Bytes.of(queryId));
             write(tx, queryMetadata);
             break;
         }
+        
+        Optional<PeriodicQueryMetadata> periodicMetadata = 
query.getPeriodicQueryMetadata();
+        if(periodicMetadata.isPresent()) {
+            write(tx, periodicMetadata.get());
+        }
 
         for(final FilterMetadata filter : query.getFilterMetadata()) {
             write(tx, filter);
@@ -510,6 +585,15 @@ public class FluoQueryMetadataDAO {
             addChildMetadata(sx, builder, 
constructBuilder.build().getChildNodeId());
             break;
 
+        case PERIODIC_QUERY:
+            // Add this node's metadata.
+            final PeriodicQueryMetadata.Builder periodicQueryBuilder = 
readPeriodicQueryMetadataBuilder(sx, childNodeId);
+            builder.addPeriodicQueryMetadata(periodicQueryBuilder);
+
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, 
periodicQueryBuilder.build().getChildNodeId());
+            break;
+            
         case AGGREGATION:
             // Add this node's metadata.
             final AggregationMetadata.Builder aggregationBuilder = 
readAggregationMetadataBuilder(sx, childNodeId);
@@ -546,6 +630,7 @@ public class FluoQueryMetadataDAO {
             break;
         default:
             break;
+        
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
new file mode 100644
index 0000000..33253f2
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Metadata that is required for periodic queries in the Rya Fluo Application. 
 
+ * If a periodic query is registered with the Rya Fluo application, the 
BindingSets
+ * are placed into temporal bins according to whether they occur within the 
window of
+ * a period's ending time.  This Metadata is used to create a Bin Id, which is 
equivalent
+ * to the period's ending time, to be inserted into each BindingSet that 
occurs within that
+ * bin.  This is to allow the AggregationUpdater to aggregate the bins by 
grouping on the 
+ * Bin Id.
+ * 
+ */
+public class PeriodicQueryMetadata extends CommonNodeMetadata {
+
+    private String parentNodeId;
+    private String childNodeId;
+    private long windowSize;
+    private long period;
+    private TimeUnit unit;
+    private String temporalVariable;
+
+    /**
+     * Constructs an instance of PeriodicQueryMetadata
+     * @param nodeId - id of periodic query node
+     * @param varOrder - variable order indicating the order the BindingSet 
results are written in
+     * @param parentNodeId - id of parent node
+     * @param childNodeId - id of child node
+     * @param windowSize - size of window used for filtering
+     * @param period - period size that indicates frequency of notifications
+     * @param unit - TimeUnit corresponding to window and period
+     * @param temporalVariable - temporal variable that periodic conditions 
are applied to
+     */
+    public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, String 
parentNodeId, String childNodeId, long windowSize, long period,
+            TimeUnit unit, String temporalVariable) {
+        super(nodeId, varOrder);
+        this.parentNodeId = Preconditions.checkNotNull(parentNodeId);
+        this.childNodeId = Preconditions.checkNotNull(childNodeId);
+        this.temporalVariable = Preconditions.checkNotNull(temporalVariable);
+        this.unit = Preconditions.checkNotNull(unit);
+        Preconditions.checkArgument(period > 0);
+        Preconditions.checkArgument(windowSize >= period);
+
+        this.windowSize = windowSize;
+        this.period = period;
+    }
+
+    /**
+     * @return id of parent for navigating query
+     */
+    public String getParentNodeId() {
+        return parentNodeId;
+    }
+
+    /**
+     * 
+     * @return id of child for navigating query
+     */
+    public String getChildNodeId() {
+        return childNodeId;
+    }
+    
+    /**
+     * 
+     * @return temporal variable used for filtering events
+     */
+    public String getTemporalVariable() {
+        return temporalVariable;
+    }
+
+    /**
+     * @return window duration in millis
+     */
+    public long getWindowSize() {
+        return windowSize;
+    }
+
+    /**
+     * @return period duration in millis
+     */
+    public long getPeriod() {
+        return period;
+    }
+
+    /**
+     * @return {@link TimeUnit} for window duration and period duration
+     */
+    public TimeUnit getUnit() {
+        return unit;
+    }
+
+
+    /**
+     * @return {@link Builder} for chaining method calls to construct an 
instance of PeriodicQueryMetadata.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), 
childNodeId, parentNodeId, temporalVariable, period, windowSize, unit);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (o == this) {
+            return true;
+        }
+
+        if (o instanceof PeriodicQueryMetadata) {
+            if (super.equals(o)) {
+                PeriodicQueryMetadata metadata = (PeriodicQueryMetadata) o;
+                return new EqualsBuilder().append(childNodeId, 
metadata.childNodeId).append(parentNodeId, metadata.parentNodeId)
+                        .append(windowSize, 
metadata.windowSize).append(period, metadata.period)
+                        .append(unit, metadata.unit).append(temporalVariable, 
metadata.temporalVariable).isEquals();
+            }
+            return false;
+        }
+
+        return false;
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("PeriodicQueryMetadata {\n")
+                .append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    Variable Order: " + super.getVariableOrder() + 
"\n")
+                .append("    Parent Node ID: " + parentNodeId + "\n")
+                .append("    Child Node ID: " + childNodeId + "\n")
+                .append("    Period: " + period + "\n")
+                .append("    Window Size: " + windowSize + "\n")
+                .append("    Time Unit: " + unit + "\n")
+                .append("    Temporal Variable: " + temporalVariable + "\n")
+                .append("}")
+                .toString();
+    }
+
+
+    /**
+     * Builder for chaining method calls to construct an instance of 
PeriodicQueryMetadata.
+     */
+    public static class Builder {
+
+        private String nodeId;
+        private VariableOrder varOrder;
+        private String parentNodeId;
+        private String childNodeId;
+        private long windowSize;
+        private long period;
+        private TimeUnit unit;
+        public String temporalVariable;
+
+        public Builder setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+        
+        /**
+         * 
+         * @return id of of this node
+         */
+        public String getNodeId() {
+            return nodeId;
+        }
+        
+        /**
+         * Set the {@link VariableOrder}
+         * @param varOrder to indicate order that results will be written in
+         * @return Builder for chaining methods calls
+         */
+        public Builder setVarOrder(VariableOrder varOrder) {
+            this.varOrder = varOrder;
+            return this;
+        }
+        
+        /**
+         * Returns {@link VariableOrder} 
+         * @return VariableOrder that indicates order that results are written 
in 
+         */
+        public VariableOrder getVarOrder() {
+            return varOrder;
+        }
+        
+        /**
+         * Sets id of parent node
+         * @param parentNodeId
+         * @return Builder for chaining methods calls
+         */
+        public Builder setParentNodeId(String parentNodeId) {
+            this.parentNodeId = parentNodeId;
+            return this;
+        }
+      
+        /**
+         * @return id of parent node
+         */
+        public String getParentNodeId() {
+            return parentNodeId;
+        }
+
+        /**
+         * Set id of child node
+         * @param childNodeId
+         * @return Builder for chaining methods calls
+         */
+        public Builder setChildNodeId(String childNodeId) {
+            this.childNodeId = childNodeId;
+            return this;
+        }
+        
+        /**
+         * Sets window size for periodic query
+         * @param windowSize
+         * @return Builder for chaining methods calls
+         */
+        public Builder setWindowSize(long windowSize) {
+            this.windowSize = windowSize;
+            return this;
+        }
+
+        /**
+         * Sets period for periodic query
+         * @param period
+         * @return Builder for chaining methods calls
+         */
+        public Builder setPeriod(long period) {
+            this.period = period;
+            return this;
+        }
+
+        /**
+         * Sets time unit of window and period for periodic query
+         * @param unit
+         * @return Builder for chaining methods calls
+         */
+        public Builder setUnit(TimeUnit unit) {
+            this.unit = unit;
+            return this;
+        }
+        
+        /**
+         * Indicate which variable in BindingSet results is the temporal 
variable that periodic
+         * Conditions should be applied to
+         * @param temporalVariable
+         * @return Builder for chaining methods calls
+         */
+        public Builder setTemporalVariable(String temporalVariable) {
+            this.temporalVariable = temporalVariable;
+            return this;
+        }
+
+        /**
+         * @return PeriodicQueryMetadata constructed from parameters passed to 
this Builder
+         */
+        public PeriodicQueryMetadata build() {
+            return new PeriodicQueryMetadata(nodeId, varOrder, parentNodeId, 
childNodeId, windowSize, period, unit, temporalVariable);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
new file mode 100644
index 0000000..f1ade59
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.openrdf.query.algebra.QueryModelVisitor;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * This is a {@link UnaryTupleOperator} that gets placed in the parsed query
+ * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL String 
that
+ * contains the Periodic {@link Function} {@link 
PeriodicQueryUtil#PeriodicQueryURI}.
+ * The PeiodicQueryNode is created from the arguments passed to the Periodic 
Function,
+ * which consist of a time unit, a temporal period, a temporal window of time, 
and the
+ * temporal variable in the query, which assumes a value indicated by the
+ * Time ontology: http://www.w3.org/2006/time. The purpose of the 
PeriodicQueryNode
+ * is to filter out all events that did not occur within the specified window 
of time
+ * of this instant and to generate notifications at a regular interval 
indicated by the period.
+ *
+ */
+public class PeriodicQueryNode extends UnaryTupleOperator {
+
+    private TimeUnit unit;
+    private long windowDuration;
+    private long periodDuration;
+    private String temporalVar;
+    
+    /**
+     * Creates a PeriodicQueryNode from the specified values.
+     * @param window - specifies the window of time that event must occur 
within from this instant
+     * @param period - regular interval at which notifications are generated 
(must be leq window).
+     * @param unit - time unit of the period and window
+     * @param temporalVar - temporal variable in query used for filtering
+     * @param arg - child of PeriodicQueryNode in parsed query
+     */
+    public PeriodicQueryNode(long window, long period, TimeUnit unit, String 
temporalVar, TupleExpr arg) {
+        super(checkNotNull(arg));
+        checkArgument(0 < period && period <= window);
+        this.temporalVar = checkNotNull(temporalVar);
+        this.unit = checkNotNull(unit);
+        this.windowDuration = window;
+        this.periodDuration = period;
+    }
+    
+    /**
+     * @return - temporal variable used to filter events
+     */
+    public String getTemporalVariable() {
+        return temporalVar;
+    }
+
+    /**
+     * @return window duration in millis
+     */
+    public long getWindowSize() {
+        return windowDuration;
+    }
+
+    /**
+     * @return period duration in millis
+     */
+    public long getPeriod() {
+        return periodDuration;
+    }
+
+    /**
+     * @return {@link TimeUnit} for window duration and period duration
+     */
+    public TimeUnit getUnit() {
+        return unit;
+    }
+    
+    @Override
+    public <X extends Exception> void visit(QueryModelVisitor<X> visitor) 
throws X {
+        visitor.meetOther(this);
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if(this == other) {
+            return true;
+        }
+        
+        if (other instanceof PeriodicQueryNode) {
+            if (super.equals(other)) {
+                PeriodicQueryNode metadata = (PeriodicQueryNode) other;
+                return new EqualsBuilder().append(windowDuration, 
metadata.windowDuration).append(periodDuration, metadata.periodDuration)
+                        .append(unit, metadata.unit).append(temporalVar, 
metadata.temporalVar).isEquals();
+            }
+            return false;
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(arg, unit, windowDuration, periodDuration, 
temporalVar);
+    }
+    
+    /**
+     * @return String representation of this node that is printed in when 
query tree is printed.
+     */
+    @Override
+    public String getSignature() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("PeriodicQueryNode(");
+        sb.append("Var = " + temporalVar + ", ");
+        sb.append("Window = " + windowDuration + " ms, ");
+        sb.append("Period = " + periodDuration + " ms, ");
+        sb.append("Time Unit = " + unit  + ")");
+       
+
+        return sb.toString();
+    }
+    
+    @Override
+    public PeriodicQueryNode clone() {
+        PeriodicQueryNode clone = (PeriodicQueryNode)super.clone();
+        clone.setArg(getArg().clone());
+        clone.periodDuration = periodDuration;
+        clone.windowDuration = windowDuration;
+        clone.unit = unit;
+        clone.temporalVar = temporalVar;
+        return clone;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index 23ac286..d017724 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -142,6 +142,10 @@ public class QueryMetadata extends CommonNodeMetadata {
         public Builder(final String nodeId) {
             this.nodeId = checkNotNull(nodeId);
         }
+        
+        public String getNodeId() {
+            return nodeId;
+        }
 
         
         /**
@@ -154,6 +158,13 @@ public class QueryMetadata extends CommonNodeMetadata {
             this.varOrder = varOrder;
             return this;
         }
+        
+        /**
+         * @return the variable order of binding sets that are emitted by this 
node
+         */
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
 
         /**
          * Set the SPARQL query whose results are being updated by the Fluo 
app.
@@ -176,6 +187,10 @@ public class QueryMetadata extends CommonNodeMetadata {
             this.childNodeId = childNodeId;
             return this;
         }
+        
+        public String getChildNodeId() {
+            return childNodeId;
+        }
 
         /**
          * @return An instance of {@link QueryMetadata} build using this 
builder's values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 631ce60..8e348f2 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -18,12 +18,13 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
 
@@ -40,12 +41,14 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
-import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
+import 
org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.model.Value;
 import org.openrdf.model.impl.BNodeImpl;
@@ -105,7 +108,9 @@ public class SparqlFluoQueryBuilder {
         final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
 
         final NewQueryVisitor visitor = new NewQueryVisitor(sparql, 
fluoQueryBuilder, nodeIds);
-        parsedQuery.getTupleExpr().visit( visitor );
+        TupleExpr te = parsedQuery.getTupleExpr();
+        PeriodicQueryUtil.placePeriodicQueryNode(te);
+        te.visit( visitor );
 
         final FluoQuery fluoQuery = fluoQueryBuilder.build();
         return fluoQuery;
@@ -187,16 +192,17 @@ public class SparqlFluoQueryBuilder {
                 prefix = AGGREGATION_PREFIX;
             }  else if (node instanceof Reduced) {
                 prefix = CONSTRUCT_PREFIX;
+            } else if(node instanceof PeriodicQueryNode) {
+                prefix = PERIODIC_QUERY_PREFIX;
             } else {
                 throw new IllegalArgumentException("Node must be of type 
{StatementPattern, Join, Filter, Extension, Projection} but was " + 
node.getClass());
             }
 
-            // Create the unique portion of the id.
             final String unique = UUID.randomUUID().toString().replaceAll("-", 
"");
-
             // Put them together to create the Node ID.
             return prefix + "_" + unique;
         }
+        
     }
 
     /**
@@ -204,19 +210,13 @@ public class SparqlFluoQueryBuilder {
      * the node to a {@link FluoQuery.Builder}. This information is used by the
      * application's observers to incrementally update a PCJ.
      */
-    private static class NewQueryVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+    public static class NewQueryVisitor extends 
QueryModelVisitorBase<RuntimeException> {
 
         private final NodeIds nodeIds;
         private final FluoQuery.Builder fluoQueryBuilder;
         private final String sparql;
 
         /**
-         * Stored with each Filter node so that we can figure out how to 
evaluate it within
-         * {@link FilterResultUpdater}. Incremented each time a filter has 
been stored.
-         */
-        private int filterIndexWithinQuery = 0;
-
-        /**
          * Constructs an instance of {@link NewQueryVisitor}.
          *
          * @param sparql - The SPARQL query whose structure will be represented
@@ -378,6 +378,7 @@ public class SparqlFluoQueryBuilder {
 
         @Override
         public void meet(final Filter node) {
+            
             // Get or create a builder for this node populated with the known 
metadata.
             final String filterId = nodeIds.getOrMakeId(node);
 
@@ -387,8 +388,13 @@ public class SparqlFluoQueryBuilder {
                 fluoQueryBuilder.addFilterMetadata(filterBuilder);
             }
 
-            filterBuilder.setOriginalSparql(sparql);
-            filterBuilder.setFilterIndexWithinSparql(filterIndexWithinQuery++);
+            String filterString;
+            try {
+                filterString = FilterSerializer.serialize(node);
+            } catch (FilterParseException e) {
+                throw new RuntimeException(e);
+            }
+            filterBuilder.setFilterSparql(filterString);
 
             final QueryModelNode child = node.getArg();
             if(child == null) {
@@ -406,6 +412,47 @@ public class SparqlFluoQueryBuilder {
             // Walk to the next node.
             super.meet(node);
         }
+        
+        public void meetOther(final QueryModelNode qNode) {
+            if (qNode instanceof PeriodicQueryNode) {
+                PeriodicQueryNode node = (PeriodicQueryNode) qNode;
+                // Get or create a builder for this node populated with the
+                // known metadata.
+                final String periodicId = nodeIds.getOrMakeId(node);
+
+                PeriodicQueryMetadata.Builder periodicBuilder = 
fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+                if (periodicBuilder == null) {
+                    periodicBuilder = PeriodicQueryMetadata.builder();
+                    periodicBuilder.setNodeId(periodicId);
+                    fluoQueryBuilder.addPeriodicQueryMetadata(periodicBuilder);
+                }
+                periodicBuilder.setWindowSize(node.getWindowSize());
+                periodicBuilder.setPeriod(node.getPeriod());
+                
periodicBuilder.setTemporalVariable(node.getTemporalVariable());
+                periodicBuilder.setUnit(node.getUnit());
+
+                final QueryModelNode child = node.getArg();
+                if (child == null) {
+                    throw new IllegalArgumentException("PeriodicQueryNode 
child arg connot be null.");
+                }
+
+                final String childNodeId = nodeIds.getOrMakeId(child);
+                periodicBuilder.setChildNodeId(childNodeId);
+
+                // Update the child node's metadata.
+                final Set<String> childVars = getVars((TupleExpr) child);
+                final VariableOrder childVarOrder = new 
VariableOrder(childVars);
+                setChildMetadata(childNodeId, childVarOrder, periodicId);
+
+                // update variable order of this node and all ancestors to
+                // include BIN_ID binding as
+                // first variable in the ordering
+                
PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId);
+                // Walk to the next node.
+                node.getArg().visit(this);
+            } 
+        }
+        
 
         @Override
         public void meet(final Projection node) {
@@ -553,10 +600,24 @@ public class SparqlFluoQueryBuilder {
 
             case QUERY:
                 throw new IllegalArgumentException("A QUERY node cannot be the 
child of another node.");
+            
             case CONSTRUCT:
                 throw new IllegalArgumentException("A CONSTRUCT node cannot be 
the child of another node.");
+            
+            case PERIODIC_QUERY:
+                PeriodicQueryMetadata.Builder periodicQueryBuilder = 
fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+                if (periodicQueryBuilder == null) {
+                    periodicQueryBuilder = PeriodicQueryMetadata.builder();
+                    periodicQueryBuilder.setNodeId(childNodeId);
+                    
fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
+                }
+                periodicQueryBuilder.setVarOrder(childVarOrder);
+                periodicQueryBuilder.setParentNodeId(parentNodeId);
+                break;
+                
             default:
                 throw new IllegalArgumentException("Unsupported NodeType: " + 
childType);
+
             }
         }
         

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
new file mode 100644
index 0000000..73f3447
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.SingletonSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+/**
+ * Class for creating a String representation a given Filter, and for
+ * converting the String representation of the Filter back to the Filter.
+ *
+ */
+public class FilterSerializer {
+
+    private static final SPARQLQueryRenderer renderer = new 
SPARQLQueryRenderer();
+    private static final SPARQLParser parser = new SPARQLParser();
+    
+    /**
+     * Converts a {@link Filter} to a SPARQL query containing only the SPARQL 
representation
+     * of the Filter along with a Select clause that return all variables.  
The argument of the
+     * Filter is replaced by a {@link SingletonSet} so that the body of the 
SPARQL query consists of only a
+     * single Filter clause.  
+     * @param filter - Filter to be serialized
+     * @return - SPARQL String containing a single Filter clause that 
represents the serialized Filter
+     * @throws FilterParseException
+     */
+    public static String serialize(Filter filter) throws FilterParseException {
+        Filter clone = filter.clone();
+        clone.setArg(new SingletonSet());
+        try {
+            return renderer.render(new ParsedTupleQuery(clone));
+        } catch (Exception e) {
+            throw new FilterParseException("Unable to parse Filter.", e);
+        }
+    }
+    
+    /**
+     * Converts a SPARQL query consisting of a single Filter clause back to a 
Filter.
+     * @param sparql - SPARQL query representing a Filter
+     * @return - parsed Filter included in the SPARQL query
+     * @throws FilterParseException
+     */
+    public static Filter deserialize(String sparql) throws 
FilterParseException {
+        
+        try {
+            ParsedQuery pq = parser.parseQuery(sparql, null);
+            FilterVisitor visitor = new FilterVisitor();
+            pq.getTupleExpr().visit(visitor);
+            Set<Filter> filters = visitor.getFilters();
+            
+            if(filters.size() != 1) {
+                throw new FilterParseException("Filter String must contain 
only one Filter.");
+            }
+            
+            return filters.iterator().next();
+            
+        } catch (Exception e) {
+            throw new FilterParseException("Unable to parse Filter.", e);
+        }
+    }
+    
+    public static class FilterVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+
+        private Set<Filter> filters;
+        
+        public FilterVisitor() {
+            filters = new HashSet<>();
+        }
+
+        public Set<Filter> getFilters() {
+            return filters;
+        }
+
+        public void meet(Filter node) {
+            filters.add(node);
+        }
+    }
+    
+    public static class FilterParseException extends Exception {
+
+        private static final long serialVersionUID = 1L;
+        
+        /**
+         * Constructs an instance of {@link FilterParseException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public FilterParseException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link FilterParseException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public FilterParseException(final String message, final Throwable t) {
+            super(message, t);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
new file mode 100644
index 0000000..9446c87
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.Optional;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+
+/**
+ * Factory for creating {@link FluoClient}s.
+ *
+ */
+public class FluoClientFactory {
+
+    /**
+     * Creates a FluoClient
+     * @param appName - name of Fluo application
+     * @param tableName - name of Fluo table
+     * @param conf - AccumuloConfiguration (must contain Accumulo User, 
Accumulo Instance, Accumulo Password, and Accumulo Zookeepers)
+     * @return FluoClient for connecting to Fluo
+     */
+    public static FluoClient getFluoClient(String appName, Optional<String> 
tableName, AccumuloRdfConfiguration conf) {
+        FluoConfiguration fluoConfig = new FluoConfiguration();
+        fluoConfig.setAccumuloInstance(conf.getAccumuloInstance());
+        fluoConfig.setAccumuloUser(conf.getAccumuloUser());
+        fluoConfig.setAccumuloPassword(conf.getAccumuloPassword());
+        fluoConfig.setInstanceZookeepers(conf.getAccumuloZookeepers() + 
"/fluo");
+        fluoConfig.setAccumuloZookeepers(conf.getAccumuloZookeepers());
+        fluoConfig.setApplicationName(appName);
+        if (tableName.isPresent()) {
+            fluoConfig.setAccumuloTable(tableName.get());
+        } else {
+            fluoConfig.setAccumuloTable(appName);
+        }
+        return new FluoClientImpl(fluoConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
new file mode 100644
index 0000000..fd24af2
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Literal;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for creating and executing Perioid Queries.
+ *
+ */
+public class PeriodicQueryUtil {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    public static final String PeriodicQueryURI = 
"http://org.apache.rya/function#periodic";;
+    public static final String temporalNameSpace = 
"http://www.w3.org/2006/time#";;
+    public static final URI DAYS = 
vf.createURI("http://www.w3.org/2006/time#days";);
+    public static final URI HOURS = 
vf.createURI("http://www.w3.org/2006/time#hours";);
+    public static final URI MINUTES = 
vf.createURI("http://www.w3.org/2006/time#minutes";);
+
+    /**
+     * Returns a PeriodicQueryNode for all {@link FunctionCall}s that 
represent PeriodicQueryNodes, otherwise
+     * an empty Optional is returned.
+     * @param functionCall - FunctionCall taken from a {@lin TupleExpr}
+     * @param arg - TupleExpr that will be the argument of the 
PeriodicQueryNode if it is created
+     * @return - Optional containing a PeriodicQueryNode if FunctionCall 
represents PeriodicQueryNode and empty Optional otherwise
+     * @throws Exception
+     */
+    public static Optional<PeriodicQueryNode> 
getPeriodicQueryNode(FunctionCall functionCall, TupleExpr arg) throws Exception 
{
+
+        if (functionCall.getURI().equals(PeriodicQueryURI)) {
+            return Optional.of(parseAndSetValues(functionCall.getArgs(), arg));
+        }
+
+        return Optional.empty();
+    }
+
+    /**
+     * Finds and places a PeriodicQueryNode if the TupleExpr contains a 
FunctionCall
+     * that represents a PeriodicQueryNode.
+     * @param query - TupleExpr with PeriodicQueryNode placed and positioned 
at the top of the query
+     */
+    public static void placePeriodicQueryNode(TupleExpr query) {
+        query.visit(new PeriodicQueryNodeVisitor());
+        query.visit(new PeriodicQueryNodeRelocator());
+    }
+    
+    public static Optional<PeriodicQueryNode> getPeriodicNode(String sparql) 
throws MalformedQueryException {
+        TupleExpr te = new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr();
+        PeriodicQueryNodeVisitor periodicVisitor = new 
PeriodicQueryNodeVisitor();
+        te.visit(periodicVisitor);
+        return periodicVisitor.getPeriodicNode();
+    }
+
+    /**
+     * Locates Filter containing FunctionCall with PeriodicQuery info and
+     * replaces that Filter with a PeriodicQueryNode.
+     */
+    public static class PeriodicQueryNodeVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+
+        private int count = 0;
+        private PeriodicQueryNode periodicNode;
+        
+        public Optional<PeriodicQueryNode> getPeriodicNode() {
+            return Optional.ofNullable(periodicNode);
+        }
+
+        public void meet(Filter node) {
+            if (node.getCondition() instanceof FunctionCall) {
+                try {
+                    Optional<PeriodicQueryNode> optNode = 
getPeriodicQueryNode((FunctionCall) node.getCondition(), node.getArg());
+                    if (optNode.isPresent()) {
+                        if (count > 0) {
+                            throw new IllegalArgumentException("Query cannot 
contain more than one PeriodicQueryNode");
+                        }
+                        periodicNode = optNode.get();
+                        node.replaceWith(periodicNode);
+                        count++;
+                        periodicNode.visit(this);
+                    } else {
+                        super.meet(node);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage());
+                }
+            } else {
+                super.meet(node);
+            }
+        }
+    }
+
+    /**
+     * Relocates PeriodicQueryNode so that it occurs below either the Construct
+     * Query Node, the Projection Query Node if no Aggregation exists, or the
+     * Group Node if an Aggregation exists. This limits the number of nodes
+     * whose variable order needs to be changed when the PeriodicQueryMetadata
+     * is added.
+     */
+    public static class PeriodicQueryNodeRelocator extends 
QueryModelVisitorBase<RuntimeException> {
+
+        private UnaryTupleOperator relocationParent;
+
+        public void meet(Projection node) {
+            relocationParent = node;
+            node.getArg().visit(this);
+        }
+
+        public void meet(Group node) {
+            relocationParent = node;
+            super.meet(node);
+        }
+
+        public void meet(Reduced node) {
+            relocationParent = node;
+            super.meet(node);
+        }
+
+        public void meet(Filter node) {
+            super.meet(node);
+        }
+
+        @Override
+        public void meetOther(QueryModelNode node) {
+
+            if (node instanceof PeriodicQueryNode) {
+                PeriodicQueryNode pNode = (PeriodicQueryNode) node;
+                // do nothing if PeriodicQueryNode already positioned correctly
+                if (pNode.equals(relocationParent.getArg())) {
+                    return;
+                }
+                // remove node from query
+                pNode.replaceWith(pNode.getArg());
+                // set node' child to be relocationParent's child
+                pNode.setArg(relocationParent.getArg());
+                // add node back into query below relocationParent
+                relocationParent.replaceChildNode(relocationParent.getArg(), 
pNode);
+            }
+        }
+    }
+
+    /**
+     * Adds the variable "periodicBinId" to the beginning of all {@link 
VariableOrder}s for the 
+     * Metadata nodes that appear above the PeriodicQueryNode.  This ensures 
that the binId is
+     * written first in the Row so that bins can be easily scanned and deleted.
+     * @param builder
+     * @param nodeId
+     */
+    public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, 
String nodeId) {
+        NodeType type = NodeType.fromNodeId(nodeId).orNull();
+        if (type == null) {
+            throw new IllegalArgumentException("NodeId must be associated with 
an existing MetadataBuilder.");
+        }
+        switch (type) {
+        case AGGREGATION:
+            AggregationMetadata.Builder aggBuilder = 
builder.getAggregateBuilder(nodeId).orNull();
+            if (aggBuilder != null) {
+                VariableOrder varOrder = aggBuilder.getVariableOrder();
+                VariableOrder groupOrder = 
aggBuilder.getGroupByVariableOrder();
+                // update varOrder with BIN_ID
+                List<String> orderList = new 
ArrayList<>(varOrder.getVariableOrders());
+                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+                aggBuilder.setVariableOrder(new VariableOrder(orderList));
+                // update groupVarOrder with BIN_ID
+                List<String> groupOrderList = new 
ArrayList<>(groupOrder.getVariableOrders());
+                groupOrderList.add(0, 
IncrementalUpdateConstants.PERIODIC_BIN_ID);
+                aggBuilder.setGroupByVariableOrder(new 
VariableOrder(groupOrderList));
+                // recursive call to update the VariableOrders of all ancestors
+                // of this node
+                updateVarOrdersToIncludeBin(builder, 
aggBuilder.getParentNodeId());
+            } else {
+                throw new IllegalArgumentException("There is no 
AggregationMetadata.Builder for the indicated Id.");
+            }
+            break;
+        case PERIODIC_QUERY:
+            PeriodicQueryMetadata.Builder periodicBuilder = 
builder.getPeriodicQueryBuilder().orNull();
+            if (periodicBuilder != null && 
periodicBuilder.getNodeId().equals(nodeId)) {
+                VariableOrder varOrder = periodicBuilder.getVarOrder();
+                List<String> orderList = new 
ArrayList<>(varOrder.getVariableOrders());
+                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+                periodicBuilder.setVarOrder(new VariableOrder(orderList));
+                // recursive call to update the VariableOrders of all ancestors
+                // of this node
+                updateVarOrdersToIncludeBin(builder, 
periodicBuilder.getParentNodeId());
+            } else {
+                throw new IllegalArgumentException(
+                        "PeriodicQueryMetadata.Builder id does not match the 
indicated id.  A query cannot have more than one PeriodicQueryMetadata Node.");
+            }
+            break;
+        case QUERY:
+            QueryMetadata.Builder queryBuilder = 
builder.getQueryBuilder().orNull();
+            if (queryBuilder != null && 
queryBuilder.getNodeId().equals(nodeId)) {
+                VariableOrder varOrder = queryBuilder.getVariableOrder();
+                List<String> orderList = new 
ArrayList<>(varOrder.getVariableOrders());
+                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+                queryBuilder.setVariableOrder(new VariableOrder(orderList));
+            } else {
+                throw new IllegalArgumentException(
+                        "QueryMetadata.Builder id does not match the indicated 
id.  A query cannot have more than one QueryMetadata Node.");
+            }
+            break;
+        default:
+            throw new IllegalArgumentException(
+                    "Incorrectly positioned PeriodicQueryNode.  The 
PeriodicQueryNode can only be positioned below Projections, Extensions, and 
ConstructQueryNodes.");
+        }
+    }
+
+    /**
+     * Collects all Metadata node Ids that are ancestors of the 
PeriodicQueryNode and contain the variable 
+     * {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}.
+     * @param sx - Fluo Snapshot for scanning Fluo
+     * @param nodeId - root node of the PeriodicQuery
+     * @param ids - query ids of all metadata nodes appearing between root and 
PeriodicQueryMetadata node
+     */
+    public static void getPeriodicQueryNodeAncestorIds(SnapshotBase sx, String 
nodeId, Set<String> ids) {
+        NodeType nodeType = NodeType.fromNodeId(nodeId).orNull();
+        checkArgument(nodeType != null, "Invalid nodeId: " + nodeId + ". 
NodeId does not correspond to a valid NodeType.");
+        switch (nodeType) {
+        case FILTER:
+            ids.add(nodeId);
+            getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.FILTER_CHILD_NODE_ID).toString(), ids);
+            break;
+        case PERIODIC_QUERY:
+            ids.add(nodeId);
+            break;
+        case QUERY:
+            ids.add(nodeId);
+            getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids);
+            break;
+        case AGGREGATION: 
+            ids.add(nodeId);
+            getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.AGGREGATION_CHILD_NODE_ID).toString(), ids);
+            break;
+        default:
+            throw new RuntimeException("Invalid NodeType.");
+        }
+    }
+
+    
+    
+    /**
+     * 
+     * @param values - Values extracted from FunctionCall representing the 
PeriodicQuery Filter
+     * @param arg - Argument of the PeriodicQueryNode that will be created 
(PeriodicQueryNode is a UnaryTupleOperator)
+     * @return - PeriodicQueryNode to be inserted in place of the original 
FunctionCall
+     * @throws Exception
+     */
+    private static PeriodicQueryNode parseAndSetValues(List<ValueExpr> values, 
TupleExpr arg) throws Exception {
+        // general validation of input
+        Preconditions.checkArgument(values.size() == 4);
+        Preconditions.checkArgument(values.get(0) instanceof Var);
+        Preconditions.checkArgument(values.get(1) instanceof ValueConstant);
+        Preconditions.checkArgument(values.get(2) instanceof ValueConstant);
+        Preconditions.checkArgument(values.get(3) instanceof ValueConstant);
+
+        // get temporal variable
+        Var var = (Var) values.get(0);
+        Preconditions.checkArgument(var.getValue() == null);
+        String tempVar = var.getName();
+
+        // get TimeUnit
+        TimeUnit unit = getTimeUnit((ValueConstant) values.get(3));
+
+        // get window and period durations
+        double windowDuration = parseTemporalDuration((ValueConstant) 
values.get(1));
+        double periodDuration = parseTemporalDuration((ValueConstant) 
values.get(2));
+        long windowMillis = convertToMillis(windowDuration, unit);
+        long periodMillis = convertToMillis(periodDuration, unit);
+        // period must evenly divide window at least once
+        Preconditions.checkArgument(windowMillis > periodMillis);
+        Preconditions.checkArgument(windowMillis % periodMillis == 0, "Period 
duration does not evenly divide window duration.");
+
+        // create PeriodicMetadata.Builder
+        return new PeriodicQueryNode(windowMillis, periodMillis, 
TimeUnit.MILLISECONDS, tempVar, arg);
+    }
+
+    private static TimeUnit getTimeUnit(ValueConstant val) {
+        Preconditions.checkArgument(val.getValue() instanceof URI);
+        URI uri = (URI) val.getValue();
+        
Preconditions.checkArgument(uri.getNamespace().equals(temporalNameSpace));
+
+        switch (uri.getLocalName()) {
+        case "days":
+            return TimeUnit.DAYS;
+        case "hours":
+            return TimeUnit.HOURS;
+        case "minutes":
+            return TimeUnit.MINUTES;
+        default:
+            throw new IllegalArgumentException("Invalid time unit for Periodic 
Function.");
+        }
+    }
+
+    private static double parseTemporalDuration(ValueConstant valConst) {
+        Value val = valConst.getValue();
+        Preconditions.checkArgument(val instanceof Literal);
+        Literal literal = (Literal) val;
+        String stringVal = literal.getLabel();
+        URI dataType = literal.getDatatype();
+        Preconditions.checkArgument(dataType.equals(XMLSchema.DECIMAL) || 
dataType.equals(XMLSchema.DOUBLE)
+                || dataType.equals(XMLSchema.FLOAT) || 
dataType.equals(XMLSchema.INTEGER) || dataType.equals(XMLSchema.INT));
+        return Double.parseDouble(stringVal);
+    }
+
+    private static long convertToMillis(double duration, TimeUnit unit) {
+        Preconditions.checkArgument(duration > 0);
+
+        double convertedDuration = 0;
+        switch (unit) {
+        case DAYS:
+            convertedDuration = duration * 24 * 60 * 60 * 1000;
+            break;
+        case HOURS:
+            convertedDuration = duration * 60 * 60 * 1000;
+            break;
+        case MINUTES:
+            convertedDuration = duration * 60 * 1000;
+            break;
+        default:
+            throw new IllegalArgumentException("TimeUnit must be of type DAYS, 
HOURS, or MINUTES.");
+        }
+        // check that double representation has exact millis representation
+        Preconditions.checkArgument(convertedDuration == (long) 
convertedDuration);
+        return (long) convertedDuration;
+    }
+
+}

Reply via email to