http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index 20d3005..dc4b3b4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -20,25 +20,25 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Notified when the results of a node have been updated to include a new 
Binding
@@ -47,6 +47,7 @@ import org.apache.fluo.api.observer.AbstractObserver;
  */
 @DefaultAnnotation(NonNull.class)
 public abstract class BindingSetUpdater extends AbstractObserver {
+    private static final Logger log = 
Logger.getLogger(BindingSetUpdater.class);
 
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -55,6 +56,7 @@ public abstract class BindingSetUpdater extends 
AbstractObserver {
     private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
     private final FilterResultUpdater filterUpdater = new 
FilterResultUpdater();
     private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
+    private final AggregationResultUpdater aggregationUpdater = new 
AggregationResultUpdater();
 
     @Override
     public abstract ObservedColumn getObservedColumn();
@@ -63,10 +65,11 @@ public abstract class BindingSetUpdater extends 
AbstractObserver {
      * Create an {@link Observation} that defines the work that needs to be 
done.
      *
      * @param tx - The Fluo transaction being used for the observer 
notification. (not null)
-     * @param parsedRow - The RowID parsed into a Binding Set and Node ID. 
(not null)
+     * @param row - The row that triggered the notification. (not null)
      * @return An {@link Observation} that defines the work that needs to be 
done.
+     * @throws Exception A problem caused this method to fail.
      */
-    public abstract Observation parseObservation(TransactionBase tx, final 
BindingSetRow parsedRow);
+    public abstract Observation parseObservation(TransactionBase tx, Bytes 
row) throws Exception;
 
     @Override
     public final void process(final TransactionBase tx, final Bytes row, final 
Column col) {
@@ -74,8 +77,15 @@ public abstract class BindingSetUpdater extends 
AbstractObserver {
         checkNotNull(row);
         checkNotNull(col);
 
-        final String bindingSetString = tx.get(row, col).toString();
-        final Observation observation = parseObservation( tx, new 
BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) );
+        final Observation observation;
+        try {
+            observation = parseObservation(tx, row);
+        } catch (final Exception e) {
+            log.error("Unable to parse an Observation from a Row and Column 
pair, so this notification will be skipped. " +
+                    "Row: " + row + " Column: " + col, e);
+            return;
+        }
+
         final String observedNodeId = observation.getObservedNodeId();
         final VisibilityBindingSet observedBindingSet = 
observation.getObservedBindingSet();
         final String parentNodeId = observation.getParentId();
@@ -85,7 +95,11 @@ public abstract class BindingSetUpdater extends 
AbstractObserver {
         switch(parentNodeType) {
             case QUERY:
                 final QueryMetadata parentQuery = 
queryDao.readQueryMetadata(tx, parentNodeId);
-                queryUpdater.updateQueryResults(tx, observedBindingSet, 
parentQuery);
+                try {
+                    queryUpdater.updateQueryResults(tx, observedBindingSet, 
parentQuery);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process a Query 
node.", e);
+                }
                 break;
 
             case FILTER:
@@ -101,11 +115,20 @@ public abstract class BindingSetUpdater extends 
AbstractObserver {
                 final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, 
parentNodeId);
                 try {
                     joinUpdater.updateJoinResults(tx, observedNodeId, 
observedBindingSet, parentJoin);
-                } catch (final BindingSetConversionException e) {
+                } catch (final Exception e) {
                     throw new RuntimeException("Could not process a Join 
node.", e);
                 }
                 break;
 
+            case AGGREGATION:
+                final AggregationMetadata parentAggregation = 
queryDao.readAggregationMetadata(tx, parentNodeId);
+                try {
+                    aggregationUpdater.updateAggregateResults(tx, 
observedBindingSet, parentAggregation);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process an 
Aggregation node.", e);
+                }
+                break;
+
             default:
                 throw new IllegalArgumentException("The parent node's NodeType 
must be of type Filter, Join, or Query, but was " + parentNodeType);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index 36af898..f5c7177 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -18,19 +18,18 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.query.BindingSet;
 
-import org.apache.fluo.api.client.TransactionBase;
-
 /**
  * Notified when the results of a Filter have been updated to include a new
  * {@link BindingSet}. This observer updates its parent if the new Binding Set
@@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase;
  */
 public class FilterObserver extends BindingSetUpdater {
 
-    private final VisibilityBindingSetStringConverter converter = new 
VisibilityBindingSetStringConverter();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -48,17 +47,17 @@ public class FilterObserver extends BindingSetUpdater {
     }
 
     @Override
-    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
-        checkNotNull(tx);
-        checkNotNull(parsedRow);
+    public Observation parseObservation(final TransactionBase tx, final Bytes 
row) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(row);
 
         // Read the Filter metadata.
-        final String filterNodeId = parsedRow.getNodeId();
+        final String filterNodeId = BindingSetRow.make(row).getNodeId();
         final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, 
filterNodeId);
 
-        // Read the Binding Set that was just emmitted by the Filter.
-        final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
-        final VisibilityBindingSet filterBindingSet = (VisibilityBindingSet) 
converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
+        // Read the Visibility Binding Set from the value.
+        final Bytes valueBytes = tx.get(row, 
FluoQueryColumns.FILTER_BINDING_SET);
+        final VisibilityBindingSet filterBindingSet = 
BS_SERDE.deserialize(valueBytes);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = filterMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 6933096..141ccc7 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -18,19 +18,18 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.query.BindingSet;
 
-import org.apache.fluo.api.client.TransactionBase;
-
 /**
  * Notified when the results of a Join have been updated to include a new
  * {@link BindingSet}. This observer updates its parent if the new Binding Set
@@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase;
  */
 public class JoinObserver extends BindingSetUpdater {
 
-    private final VisibilityBindingSetStringConverter converter = new 
VisibilityBindingSetStringConverter();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -48,16 +47,17 @@ public class JoinObserver extends BindingSetUpdater {
     }
 
     @Override
-    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
-        checkNotNull(parsedRow);
+    public Observation parseObservation(final TransactionBase tx, final Bytes 
row) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(row);
 
         // Read the Join metadata.
-        final String joinNodeId = parsedRow.getNodeId();
+        final String joinNodeId = BindingSetRow.make(row).getNodeId();
         final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, 
joinNodeId);
 
-        // Read the Binding Set that was just emmitted by the Join.
-        final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
-        final VisibilityBindingSet joinBindingSet = (VisibilityBindingSet) 
converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
+        // Read the Visibility Binding Set from the value.
+        final Bytes valueBytes = tx.get(row, 
FluoQueryColumns.JOIN_BINDING_SET);
+        final VisibilityBindingSet joinBindingSet = 
BS_SERDE.deserialize(valueBytes);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = joinMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 1238c18..28c92af 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -29,6 +29,7 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
@@ -36,11 +37,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFact
 import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
@@ -51,13 +48,7 @@ import com.google.common.collect.ImmutableSet;
 public class QueryResultObserver extends AbstractObserver {
     private static final Logger log = 
Logger.getLogger(QueryResultObserver.class);
 
-    private static final FluoQueryMetadataDAO QUERY_DAO = new 
FluoQueryMetadataDAO();
-    private static final VisibilityBindingSetStringConverter CONVERTER = new 
VisibilityBindingSetStringConverter();
-
-    /**
-     * Simplifies Visibility expressions prior to exporting PCJ results.
-     */
-    private static final VisibilitySimplifier SIMPLIFIER = new 
VisibilitySimplifier();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     /**
      * We expect to see the same expressions a lot, so we cache the simplified 
forms.
@@ -91,9 +82,9 @@ public class QueryResultObserver extends AbstractObserver {
         final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder 
= ImmutableSet.builder();
 
         for(final IncrementalResultExporterFactory builder : factories) {
-            try {
-                log.debug("QueryResultObserver.init(): for each 
exportersBuilder=" + builder);
+               log.debug("QueryResultObserver.init(): for each 
exportersBuilder=" + builder);
 
+            try {
                 final Optional<IncrementalResultExporter> exporter = 
builder.build(context);
                 if(exporter.isPresent()) {
                     exportersBuilder.add(exporter.get());
@@ -107,28 +98,22 @@ public class QueryResultObserver extends AbstractObserver {
     }
 
     @Override
-    public void process(final TransactionBase tx, final Bytes brow, final 
Column col) {
+    public void process(final TransactionBase tx, final Bytes brow, final 
Column col) throws Exception {
         final String row = brow.toString();
-        
-        // Read the SPARQL query and it Binding Set from the row id.
-        final String[] queryAndBindingSet = row.split(NODEID_BS_DELIM);
-        final String queryId = queryAndBindingSet[0];
-        final String bindingSetString = tx.gets(row, col);
 
-        // Fetch the query's Variable Order from the Fluo table.
-        final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, 
queryId);
-        final VariableOrder varOrder = queryMetadata.getVariableOrder();
+        // Read the SPARQL query and it Binding Set from the row id.
+        final String queryId = row.split(NODEID_BS_DELIM)[0];
 
-        // Create the result that will be exported.
-        final VisibilityBindingSet result = 
CONVERTER.convert(bindingSetString, varOrder);
+        // Read the Child Binding Set that will be exported.
+        final Bytes valueBytes = tx.get(brow, col);
+        final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes);
 
         // Simplify the result's visibilities.
         final String visibility = result.getVisibility();
         if(!simplifiedVisibilities.containsKey(visibility)) {
-            final String simplified = SIMPLIFIER.simplify( visibility );
+            final String simplified = VisibilitySimplifier.simplify( 
visibility );
             simplifiedVisibilities.put(visibility, simplified);
         }
-
         result.setVisibility( simplifiedVisibilities.get(visibility) );
 
         // Export the result using each of the provided exporters.
@@ -136,8 +121,21 @@ public class QueryResultObserver extends AbstractObserver {
             try {
                 exporter.export(tx, queryId, result);
             } catch (final ResultExportException e) {
-                log.error("Could not export a binding set for query '" + 
queryId + "'. Binding Set: " + bindingSetString);
+                log.error("Could not export a binding set for query '" + 
queryId + "'. Binding Set: " + result, e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if(exporters != null) {
+            for(final IncrementalResultExporter exporter : exporters) {
+                try {
+                    exporter.close();
+                } catch(final Exception e) {
+                    log.warn("Problem encountered while closing one of the 
exporters.", e);
+                }
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 5956634..b0548b4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -18,19 +18,18 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.query.BindingSet;
 
-import org.apache.fluo.api.client.TransactionBase;
-
 /**
  * Notified when the results of a Statement Pattern have been updated to 
include
  * a new {@link BindingSet}. This observer updates its parent if the new
@@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase;
  */
 public class StatementPatternObserver extends BindingSetUpdater {
 
-    private static final VisibilityBindingSetStringConverter CONVERTER = new 
VisibilityBindingSetStringConverter();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -49,17 +48,17 @@ public class StatementPatternObserver extends 
BindingSetUpdater {
     }
 
     @Override
-    public Observation parseObservation(final TransactionBase tx, final 
BindingSetRow parsedRow) {
-        checkNotNull(tx);
+    public Observation parseObservation(final TransactionBase tx, final Bytes 
row) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(row);
 
         // Read the Statement Pattern metadata.
-        final String spNodeId = parsedRow.getNodeId();
+        final String spNodeId = BindingSetRow.make(row).getNodeId();
         final StatementPatternMetadata spMetadata = 
queryDao.readStatementPatternMetadata(tx, spNodeId);
-        final String bindingSetValue = parsedRow.getBindingSetString();
 
-        // Read the Binding Set that was just emmitted by the Statement 
Pattern.
-        final VariableOrder spVarOrder = spMetadata.getVariableOrder();
-        final VisibilityBindingSet spBindingSet = (VisibilityBindingSet) 
CONVERTER.convert(bindingSetValue, spVarOrder);
+        // Read the Visibility Binding Set from the value.
+        final Bytes valueBytes = tx.get(row, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET);
+        final VisibilityBindingSet spBindingSet = 
BS_SERDE.deserialize(valueBytes);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = spMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 70f1cbd..3c43885 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -21,11 +21,20 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
 
 import java.util.Map;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
+import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -33,15 +42,8 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
 
 /**
  * An observer that matches new Triples to the Statement Patterns that are part
@@ -49,9 +51,11 @@ import org.apache.fluo.api.observer.AbstractObserver;
  * the new result is stored as a binding set for the pattern.
  */
 public class TripleObserver extends AbstractObserver {
+    private static final Logger log = Logger.getLogger(TripleObserver.class);
 
-    private static final FluoQueryMetadataDAO QUERY_DAO = new 
FluoQueryMetadataDAO();
-    private static final VisibilityBindingSetStringConverter CONVERTER = new 
VisibilityBindingSetStringConverter();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
+    private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new 
FluoQueryMetadataDAO();
+    private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER 
= new VisibilityBindingSetStringConverter();
 
     public TripleObserver() {}
 
@@ -62,85 +66,113 @@ public class TripleObserver extends AbstractObserver {
 
     @Override
     public void process(final TransactionBase tx, final Bytes brow, final 
Column column) {
-        //get string representation of triple
-        String row = brow.toString();
-        final String triple = IncUpdateDAO.getTripleString(brow);
-        String visibility = tx.gets(row, FluoQueryColumns.TRIPLES, "");
-       
-        //get variable metadata for all SP in table
-        RowScanner rscanner = 
tx.scanner().over(Span.prefix(SP_PREFIX)).fetch(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER).byRow().build();
-       
+        // Get string representation of triple.
+        final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Rya Statement: " + ryaStatement + "\n");
 
-        //see if triple matches conditions of any of the SP
+        final String triple = IncUpdateDAO.getTripleString(ryaStatement);
 
-        for (ColumnScanner colScanner : rscanner) {
+        // Iterate over each of the Statement Patterns that are being matched 
against.
+        final RowScanner spScanner = tx.scanner()
+                .over(Span.prefix(SP_PREFIX))
+
+                // Only fetch rows that have the pattern in them. There will 
only be a single row with a pattern per SP.
+                .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN)
+                .byRow()
+                .build();
+
+        //see if triple matches conditions of any of the SP
+        for (final ColumnScanner colScanner : spScanner) {
+            // Get the Statement Pattern's node id.
             final String spID = colScanner.getsRow();
 
-            final StatementPatternMetadata spMetadata = 
QUERY_DAO.readStatementPatternMetadata(tx, spID);
+            // Fetch its metadata.
+            final StatementPatternMetadata spMetadata = 
QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
+
+            // Attempt to match the triple against the pattern.
             final String pattern = spMetadata.getStatementPattern();
-            
-            for (ColumnValue cv : colScanner) {
-                final String varOrders = cv.getsValue();
-                final VariableOrder varOrder = new VariableOrder(varOrders);
-                final String bindingSetString = getBindingSet(triple, pattern, 
varOrders);
-
-                //Statement matches to a binding set
-                if(bindingSetString.length() != 0) {
-                    final VisibilityBindingSet bindingSet = new 
VisibilityBindingSet(
-                        CONVERTER.convert(bindingSetString, varOrder),
-                        visibility);
-                    final String valueString = CONVERTER.convert(bindingSet, 
varOrder);
-                    tx.set(spID + NODEID_BS_DELIM + bindingSetString, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueString);
+            final VariableOrder varOrder = spMetadata.getVariableOrder();
+            final String bindingSetString = getBindingSet(triple, pattern, 
varOrder);
+
+            // Statement matches to a binding set.
+            if(bindingSetString.length() != 0) {
+                // Fetch the triple's visibility label.
+                final String visibility = tx.gets(brow.toString(), 
FluoQueryColumns.TRIPLES, "");
+
+                // Create the Row ID for the emitted binding set. It does not 
contain visibilities.
+                final String row = spID + NODEID_BS_DELIM + bindingSetString;
+                final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) 
);
+
+                // If this is a new Binding Set, then emit it.
+                if(tx.get(rowBytes, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
+                    // Create the Binding Set that goes in the Node Value. It 
does contain visibilities.
+                    final VisibilityBindingSet visBindingSet = 
VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
+                    visBindingSet.setVisibility(visibility);
+
+                    try {
+                        final Bytes valueBytes = 
BS_SERDE.serialize(visBindingSet);
+
+                        log.trace(
+                                "Transaction ID: " + tx.getStartTimestamp() + 
"\n" +
+                                        "Matched Statement Pattern: " + spID + 
"\n" +
+                                        "Binding Set: " + visBindingSet + 
"\n");
+
+                        tx.set(rowBytes, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
+                    } catch(final Exception e) {
+                        log.error("Couldn't serialize a Binding Set. This 
value will be skipped.", e);
+                    }
                 }
-                       }
-               }
+            }
+        }
 
         // Once the triple has been handled, it may be deleted.
-        tx.delete(row, column);
+        tx.delete(brow, column);
     }
 
     /**
-     * Determines whether triple matches Statement Pattern ID conditions if
-     * so, generates a string representation of a BindingSet whose order
-     * is determined by varOrder.
+     * Determines whether a triple matches a Statement Pattern. If so, it 
generates a string representation of a
+     * BindingSet whose order is determined by varOrder.
+     *
      * @param triple - The triple to consider.
-     * @param spID - The statement pattern ID
-     * @param varOrder - The variable order
-     * @return The string representation of the BindingSet or an empty string,
-     * signifying the triple did not match the statement pattern ID.
+     * @param pattern - The pattern the triple must match.
+     * @param varOrder - The variable order of the Binding Set String that is 
produced by this method.
+     * @return The string representation of a Binding Set that is generated by 
matching the triple to the pattern;
+     *   otherwise an empty string if the pattern couldn't be matched.
      */
-    private static String getBindingSet(final String triple, final String 
spID, final String varOrder) {
-        final String[] spIdArray = spID.split(DELIM);
+    private static String getBindingSet(final String triple, final String 
pattern, final VariableOrder varOrder) {
+        final String[] patternArray = pattern.split(DELIM);
         final String[] tripleArray = triple.split(DELIM);
-        final String[] varOrderArray = varOrder.split(VAR_DELIM);
-        final Map<String,String> varMap = Maps.newHashMap();
+        final String[] varOrderArray = varOrder.toArray();
+        final Map<String,String> bindingValues = Maps.newHashMap();
 
-        if(spIdArray.length != 3 || tripleArray.length != 3) {
+        if(patternArray.length != 3 || tripleArray.length != 3) {
             throw new IllegalArgumentException("Invald number of components");
         }
 
+        // Extract the binding names and values.
         for(int i = 0; i < 3; i ++) {
-
-            if(spIdArray[i].startsWith("-const-")) {
-                if(!spIdArray[i].substring(7).equals(tripleArray[i])) {
+            if(patternArray[i].startsWith("-const-")) {
+                // If a constant value does not match, then the triple does 
not match the pattern.
+                if(!patternArray[i].substring(7).equals(tripleArray[i])) {
                     return "";
                 }
             } else{
-                varMap.put(spIdArray[i], tripleArray[i]);
+                bindingValues.put(patternArray[i], tripleArray[i]);
             }
-
         }
 
-        String bindingSet = "";
-
-        for (final String element : varOrderArray) {
-            if(bindingSet.length() == 0) {
-                bindingSet = varMap.get(element);
+        // Create the returned binding set string from the extracted values.
+        String bindingSetString = "";
+        for (final String bindingName : varOrderArray) {
+            if(bindingSetString.length() == 0) {
+                bindingSetString = bindingValues.get(bindingName);
             } else {
-                bindingSet = bindingSet + DELIM + varMap.get(element);
+                bindingSetString = bindingSetString + DELIM + 
bindingValues.get(bindingName);
             }
         }
 
-        return bindingSet;
+        return bindingSetString;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
new file mode 100644
index 0000000..3bc8da6
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Avg;
+import org.openrdf.query.algebra.Count;
+import org.openrdf.query.algebra.Max;
+import org.openrdf.query.algebra.Min;
+import org.openrdf.query.algebra.Sum;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
+/**
+ * Metadata that is relevant to Aggregate nodes.
+ */
+@Immutable
+@DefaultAnnotation(NonNull.class)
+public class AggregationMetadata extends CommonNodeMetadata {
+
+    /**
+     * The different types of Aggregation functions that an aggregate node may 
perform.
+     */
+    public static enum AggregationType {
+        MIN(Min.class),
+        MAX(Max.class),
+        COUNT(Count.class),
+        SUM(Sum.class),
+        AVERAGE(Avg.class);
+
+        private final Class<? extends AggregateOperator> operatorClass;
+
+        private AggregationType(final Class<? extends AggregateOperator> 
operatorClass) {
+            this.operatorClass = requireNonNull(operatorClass);
+        }
+
+        private static final ImmutableMap<Class<? extends AggregateOperator>, 
AggregationType> byOperatorClass;
+        static {
+            final ImmutableMap.Builder<Class<? extends AggregateOperator>, 
AggregationType> builder = ImmutableMap.builder();
+            for(final AggregationType type : AggregationType.values()) {
+                builder.put(type.operatorClass, type);
+            }
+            byOperatorClass = builder.build();
+        }
+
+        public static Optional<AggregationType> byOperatorClass(final Class<? 
extends AggregateOperator> operatorClass) {
+            return Optional.ofNullable( byOperatorClass.get(operatorClass) );
+        }
+    }
+
+    /**
+     * Represents all of the metadata require to perform an Aggregation that 
is part of a SPARQL query.
+     * </p>
+     * For example, if you have the following in SPARQL:
+     * <pre>
+     * SELECT (avg(?price) as ?avgPrice) {
+     *     ...
+     * }
+     * </pre>
+     * You would construct an instance of this object like so:
+     * <pre>
+     * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice");
+     * </pre>
+     */
+    @Immutable
+    @DefaultAnnotation(NonNull.class)
+    public static final class AggregationElement implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final AggregationType aggregationType;
+        private final String aggregatedBindingName;
+        private final String resultBindingName;
+
+        /**
+         * Constructs an instance of {@link AggregationElement}.
+         *
+         * @param aggregationType - Defines how the binding values will be 
aggregated. (not null)
+         * @param aggregatedBindingName - The name of the binding whose values 
is aggregated. This binding must
+         *   appear within the child node's emitted binding sets. (not null)
+         * @param resultBindingName - The name of the binding this 
aggregation's results are written to. This binding
+         *   must appeared within the AggregationMetadata's variable order. 
(not null)
+         */
+        public AggregationElement(
+                final AggregationType aggregationType,
+                final String aggregatedBindingName,
+                final String resultBindingName) {
+            this.aggregationType = requireNonNull(aggregationType);
+            this.aggregatedBindingName = requireNonNull(aggregatedBindingName);
+            this.resultBindingName = requireNonNull(resultBindingName);
+        }
+
+        /**
+         * @return Defines how the binding values will be aggregated.
+         */
+        public AggregationType getAggregationType() {
+            return aggregationType;
+        }
+
+        /**
+         * @return The name of the binding whose values is aggregated. This 
binding must appear within the child node's emitted binding sets.
+         */
+        public String getAggregatedBindingName() {
+            return aggregatedBindingName;
+        }
+
+        /**
+         * @return The name of the binding this aggregation's results are 
written to. This binding must appeared within the AggregationMetadata's 
variable order.
+         */
+        public String getResultBindingName() {
+            return resultBindingName;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(aggregationType, aggregatedBindingName, 
resultBindingName);
+        }
+
+        @Override
+        public boolean equals(final Object o ) {
+            if(o instanceof AggregationElement) {
+                final AggregationElement agg = (AggregationElement) o;
+                return Objects.equals(aggregationType, agg.aggregationType) &&
+                        Objects.equals(aggregatedBindingName, 
agg.aggregatedBindingName) &&
+                        Objects.equals(resultBindingName, 
agg.resultBindingName);
+            }
+            return false;
+        }
+    }
+
+    private final String parentNodeId;
+    private final String childNodeId;
+    private final Collection<AggregationElement> aggregations;
+    private final VariableOrder groupByVariables;
+
+    /**
+     * Constructs an instance of {@link AggregationMetadata}.
+     *
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not 
null)
+     * @param varOrder - The variable order of binding sets that are emitted 
by this node. This may only contain a
+     *   single variable because aggregations are only able to emit the 
aggregated value. (not null)
+     * @param parentNodeId - The Node ID of this node's parent. This is the 
node that will consume the results of the aggregations. (not null)
+     * @param childNodeId - The Node ID of this node's child. This is the node 
that will feed binding sets into the aggregations. (not null)
+     * @param aggregations - The aggregations that will be performed over the 
BindingSets that are emitted from the child node. (not null)
+     * @param groupByVariables - Defines how the data is grouped for the 
aggregation function. (not null, may be empty if no grouping is required)
+     */
+    public AggregationMetadata(
+            final String nodeId,
+            final VariableOrder varOrder,
+            final String parentNodeId,
+            final String childNodeId,
+            final Collection<AggregationElement> aggregations,
+            final VariableOrder groupByVariables) {
+        super(nodeId, varOrder);
+        this.parentNodeId = requireNonNull(parentNodeId);
+        this.childNodeId = requireNonNull(childNodeId);
+        this.aggregations = requireNonNull(aggregations);
+        this.groupByVariables = requireNonNull(groupByVariables);
+    }
+
+    /**
+     * @return The Node ID of this node's parent. This is the node that will 
consume the results of the aggregations.
+     */
+    public String getParentNodeId() {
+        return parentNodeId;
+    }
+
+    /**
+     * @return The Node ID of this node's child. This is the node that will 
feed binding sets into the aggregations.
+     */
+    public String getChildNodeId() {
+        return childNodeId;
+    }
+
+    /**
+     * @return The aggregations that will be performed over the BindingSets 
that are emitted from the child node.
+     */
+    public Collection<AggregationElement> getAggregations() {
+        return aggregations;
+    }
+
+    /**
+     * @return Defines how the data is grouped for the aggregation function.
+     */
+    public VariableOrder getGroupByVariableOrder() {
+        return groupByVariables;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                super.getNodeId(),
+                super.getVariableOrder(),
+                parentNodeId,
+                childNodeId,
+                aggregations,
+                groupByVariables);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(o instanceof AggregationMetadata) {
+            final AggregationMetadata metadata = (AggregationMetadata) o;
+            return Objects.equals(getNodeId(), metadata.getNodeId()) &&
+                    Objects.equals(super.getVariableOrder(), 
metadata.getVariableOrder()) &&
+                    Objects.equals(parentNodeId, metadata.parentNodeId) &&
+                    Objects.equals(childNodeId, metadata.childNodeId) &&
+                    Objects.equals(aggregations, metadata.aggregations) &&
+                    Objects.equals(groupByVariables, 
metadata.groupByVariables);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder string = new StringBuilder()
+                .append("AggregationMetadata {\n")
+                .append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    Variable Order: " + super.getVariableOrder() + 
"\n")
+                .append("    Parent Node ID: " + parentNodeId + "\n")
+                .append("    Child Node ID: " + childNodeId + "\n");
+
+        // Only print the group by names if they're preesnt.
+        if(!groupByVariables.getVariableOrders().isEmpty()) {
+            string.append("    GroupBy Variable Order: " + groupByVariables + 
"\n");
+        }
+
+        // Print each of the AggregationElements.
+        string.append("    Aggregations: {\n");
+        final Iterator<AggregationElement> it = aggregations.iterator();
+        while(it.hasNext()) {
+            final AggregationElement agg = it.next();
+            string.append("        Type: " + agg.getAggregationType() + "\n");
+            string.append("        Aggregated Binding Name: " + 
agg.getAggregatedBindingName() + "\n");
+            string.append("        Result Binding Name: " + 
agg.getResultBindingName() + "\n");
+
+            if(it.hasNext()) {
+                string.append("\n");
+            }
+        }
+        string.append("    }\n");
+        string.append("}");
+
+        return string.toString();
+    }
+
+    /**
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not 
null)
+     * @return A new {@link Builder} initialized with the provided nodeId.
+     */
+    public static Builder builder(final String nodeId) {
+        return new Builder(nodeId);
+    }
+
+    /**
+     * Builds instances of {@link AggregationMetadata}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class Builder {
+
+        private final String nodeId;
+        private VariableOrder varOrder;
+        private String parentNodeId;
+        private String childNodeId;
+        private final List<AggregationElement> aggregations = new 
ArrayList<>();
+        private VariableOrder groupByVariables = new VariableOrder();
+
+        /**
+         * Constructs an instance of {@link Builder}.
+         *
+         * @param nodeId - This node's Node ID. (not null)
+         */
+        public Builder(final String nodeId) {
+            this.nodeId = requireNonNull(nodeId);
+        }
+
+        /**
+         * @return This node's Node ID.
+         */
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @param varOrder - The variable order of binding sets that are 
emitted by this node. This may only contain a
+         *   single variable because aggregations are only able to emit the 
aggregated value.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setVariableOrder(@Nullable final VariableOrder 
varOrder) {
+            this.varOrder = varOrder;
+            return this;
+        }
+
+        /**
+         * @param parentNodeId - The Node ID of this node's parent.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setParentNodeId(@Nullable final String parentNodeId) {
+            this.parentNodeId = parentNodeId;
+            return this;
+        }
+
+        /**
+         * @param childNodeId - The Node ID of this node's child.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setChildNodeId(@Nullable final String childNodeId) {
+            this.childNodeId = childNodeId;
+            return this;
+        }
+
+        /**
+         * @param aggregation - An aggregation that will be performed over the 
BindingSets that are emitted from the child node.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder addAggregation(@Nullable final AggregationElement 
aggregation) {
+            if(aggregation != null) {
+                this.aggregations.add(aggregation);
+            }
+            return this;
+        }
+
+        /**
+         * @param groupByBindingNames - Defines how the data is grouped for 
the aggregation function. (not null, may be
+         *   empty if no grouping is required)
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setGroupByVariableOrder(@Nullable final VariableOrder 
groupByVariables) {
+            this.groupByVariables = groupByVariables;
+            return this;
+        }
+
+        /**
+         * @return An instance of {@link AggregationMetadata} build using this 
builder's values.
+         */
+        public AggregationMetadata build() {
+            return new AggregationMetadata(nodeId, varOrder, parentNodeId, 
childNodeId, aggregations, groupByVariables);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 263db7e..3230a5d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -18,24 +18,24 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
 import org.apache.commons.lang3.builder.EqualsBuilder;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
 /**
  * Metadata for every node of a query that is being updated by the Fluo 
application.
  */
@@ -47,6 +47,7 @@ public class FluoQuery {
     private final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata;
     private final ImmutableMap<String, FilterMetadata> filterMetadata;
     private final ImmutableMap<String, JoinMetadata> joinMetadata;
+    private final ImmutableMap<String, AggregationMetadata> 
aggregationMetadata;
 
     /**
      * Constructs an instance of {@link FluoQuery}. Private because 
applications
@@ -55,20 +56,24 @@ public class FluoQuery {
      * @param queryMetadata - The root node of a query that is updated in 
Fluo. (not null)
      * @param statementPatternMetadata - A map from Node ID to Statement 
Pattern metadata as
      *   it is represented within the Fluo app. (not null)
-     * @param filterMetadata A map from Node ID to Filter metadata as it is 
represented
+     * @param filterMetadata - A map from Node ID to Filter metadata as it is 
represented
      *   within the Fluo app. (not null)
-     * @param joinMetadata A map from Node ID to Join metadata as it is 
represented
+     * @param joinMetadata - A map from Node ID to Join metadata as it is 
represented
      *   within the Fluo app. (not null)
+     * @param aggregationMetadata - A map from Node ID to Aggregation metadata 
as it is
+     *   represented within the Fluo app. (not null)
      */
     private FluoQuery(
             final QueryMetadata queryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
-            final ImmutableMap<String, JoinMetadata> joinMetadata) {
-        this.queryMetadata = checkNotNull(queryMetadata);
-        this.statementPatternMetadata = checkNotNull(statementPatternMetadata);
-        this.filterMetadata = checkNotNull(filterMetadata);
-        this.joinMetadata = checkNotNull(joinMetadata);
+            final ImmutableMap<String, JoinMetadata> joinMetadata,
+            final ImmutableMap<String, AggregationMetadata> 
aggregationMetadata) {
+        this.queryMetadata = requireNonNull(queryMetadata);
+        this.statementPatternMetadata = 
requireNonNull(statementPatternMetadata);
+        this.filterMetadata = requireNonNull(filterMetadata);
+        this.joinMetadata = requireNonNull(joinMetadata);
+        this.aggregationMetadata = requireNonNull(aggregationMetadata);
     }
 
     /**
@@ -85,7 +90,7 @@ public class FluoQuery {
      * @return The StatementPattern metadata if it could be found; otherwise 
absent.
      */
     public Optional<StatementPatternMetadata> 
getStatementPatternMetadata(final String nodeId) {
-        checkNotNull(nodeId);
+        requireNonNull(nodeId);
         return Optional.fromNullable( statementPatternMetadata.get(nodeId) );
     }
 
@@ -103,7 +108,7 @@ public class FluoQuery {
      * @return The Filter metadata if it could be found; otherwise absent.
      */
     public Optional<FilterMetadata> getFilterMetadata(final String nodeId) {
-        checkNotNull(nodeId);
+        requireNonNull(nodeId);
         return Optional.fromNullable( filterMetadata.get(nodeId) );
     }
 
@@ -121,7 +126,7 @@ public class FluoQuery {
      * @return The Join metadata if it could be found; otherwise absent.
      */
     public Optional<JoinMetadata> getJoinMetadata(final String nodeId) {
-        checkNotNull(nodeId);
+        requireNonNull(nodeId);
         return Optional.fromNullable( joinMetadata.get(nodeId) );
     }
 
@@ -132,13 +137,32 @@ public class FluoQuery {
         return joinMetadata.values();
     }
 
+    /**
+     * Get an Aggregation node's metadata.
+     *
+     * @param nodeId - The node ID of the Aggregation metadata you want. (not 
null)
+     * @return The Aggregation metadata if it could be found; otherwise absent.
+     */
+    public Optional<AggregationMetadata> getAggregationMetadata(final String 
nodeId) {
+        requireNonNull(nodeId);
+        return Optional.fromNullable( aggregationMetadata.get(nodeId) );
+    }
+
+    /**
+     * @return All of the Aggregation metadata that is stored for the query.
+     */
+    public Collection<AggregationMetadata> getAggregationMetadata() {
+        return aggregationMetadata.values();
+    }
+
     @Override
     public int hashCode() {
         return Objects.hashCode(
                 queryMetadata,
                 statementPatternMetadata,
                 filterMetadata,
-                joinMetadata);
+                joinMetadata,
+                aggregationMetadata);
     }
 
     @Override
@@ -154,6 +178,7 @@ public class FluoQuery {
                     .append(statementPatternMetadata, 
fluoQuery.statementPatternMetadata)
                     .append(filterMetadata, fluoQuery.filterMetadata)
                     .append(joinMetadata, fluoQuery.joinMetadata)
+                    .append(aggregationMetadata, fluoQuery.aggregationMetadata)
                     .isEquals();
         }
 
@@ -184,6 +209,11 @@ public class FluoQuery {
             builder.append("\n");
         }
 
+        for(final AggregationMetadata metadata : aggregationMetadata.values()) 
{
+            builder.append(metadata.toString());
+            builder.append("\n");
+        }
+
         return builder.toString();
     }
 
@@ -204,6 +234,7 @@ public class FluoQuery {
         private final Map<String, StatementPatternMetadata.Builder> spBuilders 
= new HashMap<>();
         private final Map<String, FilterMetadata.Builder> filterBuilders = new 
HashMap<>();
         private final Map<String, JoinMetadata.Builder> joinBuilders = new 
HashMap<>();
+        private final Map<String, AggregationMetadata.Builder> 
aggregationBuilders = new HashMap<>();
 
         /**
          * Sets the {@link QueryMetadata.Builder} that is used by this builder.
@@ -230,7 +261,7 @@ public class FluoQuery {
          * @return This builder so that method invocation may be chained.
          */
         public Builder addStatementPatternBuilder(final 
StatementPatternMetadata.Builder spBuilder) {
-            checkNotNull(spBuilder);
+            requireNonNull(spBuilder);
             spBuilders.put(spBuilder.getNodeId(), spBuilder);
             return this;
         }
@@ -242,7 +273,7 @@ public class FluoQuery {
          * @return The builder that was stored at the node id if one was found.
          */
         public Optional<StatementPatternMetadata.Builder> 
getStatementPatternBuilder(final String nodeId) {
-            checkNotNull(nodeId);
+            requireNonNull(nodeId);
             return Optional.fromNullable( spBuilders.get(nodeId) );
         }
 
@@ -252,8 +283,8 @@ public class FluoQuery {
          * @param filterBuilder - A builder representing a specific Filter 
within the query. (not null)
          * @return This builder so that method invocation may be chained.
          */
-        public Builder addFilterMetadata(@Nullable final 
FilterMetadata.Builder filterBuilder) {
-            checkNotNull(filterBuilder);
+        public Builder addFilterMetadata(final FilterMetadata.Builder 
filterBuilder) {
+            requireNonNull(filterBuilder);
             this.filterBuilders.put(filterBuilder.getNodeId(), filterBuilder);
             return this;
         }
@@ -265,7 +296,7 @@ public class FluoQuery {
          * @return The builder that was stored at the node id if one was found.
          */
         public Optional<FilterMetadata.Builder> getFilterBuilder(final String 
nodeId) {
-            checkNotNull(nodeId);
+            requireNonNull(nodeId);
             return Optional.fromNullable( filterBuilders.get(nodeId) );
         }
 
@@ -275,20 +306,43 @@ public class FluoQuery {
          * @param joinBuilder - A builder representing a specific Join within 
the query. (not null)
          * @return This builder so that method invocation may be chained.
          */
-        public Builder addJoinMetadata(@Nullable final JoinMetadata.Builder 
joinBuilder) {
-            checkNotNull(joinBuilder);
+        public Builder addJoinMetadata(final JoinMetadata.Builder joinBuilder) 
{
+            requireNonNull(joinBuilder);
             this.joinBuilders.put(joinBuilder.getNodeId(), joinBuilder);
             return this;
         }
 
         /**
+         * Get an Aggregate builder from this builder.
+         *
+         * @param nodeId - The Node ID the Aggregate builder was stored at. 
(not null)
+         * @return The builder that was stored at the node id if one was found.
+         */
+        public Optional<AggregationMetadata.Builder> getAggregateBuilder(final 
String nodeId) {
+            requireNonNull(nodeId);
+            return Optional.fromNullable( aggregationBuilders.get(nodeId) );
+        }
+
+        /**
+         * Adds a new {@link AggregationMetadata.Builder} to this builder.
+         *
+         * @param aggregationBuilder - A builder representing a specific 
Aggregation within the Query. (not null)
+         * @return This builder so that method invocation may be chained.
+         */
+        public Builder addAggregateMetadata(@Nullable final 
AggregationMetadata.Builder aggregationBuilder) {
+            requireNonNull(aggregationBuilder);
+            this.aggregationBuilders.put(aggregationBuilder.getNodeId(), 
aggregationBuilder);
+            return this;
+        }
+
+        /**
          * Get a Join builder from this builder.
          *
          * @param nodeId - The Node ID the Join builder was stored at. (not 
null)
          * @return The builder that was stored at the node id if one was found.
          */
         public Optional<JoinMetadata.Builder> getJoinBuilder(final String 
nodeId) {
-            checkNotNull(nodeId);
+            requireNonNull(nodeId);
             return Optional.fromNullable( joinBuilders.get(nodeId) );
         }
 
@@ -313,7 +367,12 @@ public class FluoQuery {
                 joinMetadata.put(entry.getKey(), entry.getValue().build());
             }
 
-            return new FluoQuery(queryMetadata, spMetadata.build(), 
filterMetadata.build(), joinMetadata.build());
+            final ImmutableMap.Builder<String, AggregationMetadata> 
aggregateMetadata = ImmutableMap.builder();
+            for(final Entry<String, AggregationMetadata.Builder> entry : 
aggregationBuilders.entrySet()) {
+                aggregateMetadata.put(entry.getKey(), 
entry.getValue().build());
+            }
+
+            return new FluoQuery(queryMetadata, spMetadata.build(), 
filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index f12c6ab..77d6a49 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -23,11 +23,13 @@ import static java.util.Objects.requireNonNull;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.fluo.api.data.Column;
+import 
org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
-import org.apache.fluo.api.data.Column;
-
 /**
  * Holds {@link Column} objects that represent where each piece of metadata is 
stored.
  * <p>
@@ -40,7 +42,7 @@ import org.apache.fluo.api.data.Column;
  *     <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The 
Variable Order binding sets are emitted with.</td> </tr>
  *     <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original 
SPARQL query that is being computed by this query.</td> </tr>
  *     <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node 
ID of the child who feeds this node.</td> </tr>
- *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>queryMetadata:bindingSet</td> <td>A Binding Set that matches the 
query.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} 
object.</td> </tr>
  *   </table>
  * </p>
  * <p>
@@ -53,7 +55,7 @@ import org.apache.fluo.api.data.Column;
  *     <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> 
<td>Indicates which filter within the original SPARQL query this 
represents.</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node 
ID this filter emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
- *     <tr> <td>Node ID + DELIM + Binding set String</td> 
<td>filterMetadata:bindingSet</td> <td>A Binding Set that matches the 
Filter.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>filterMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} 
object.</td> </tr>
  *   </table>
  * </p>
  * <p>
@@ -66,7 +68,7 @@ import org.apache.fluo.api.data.Column;
  *     <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node 
ID this join emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
- *     <tr> <td>Node ID + DELIM + Binding set String</td> 
<td>joinMetadata:bindingSet</td> <td>A Binding Set that matches the Join.</td> 
</tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} 
object.</td> </tr>
  *   </table>
  * </p>
  * <p>
@@ -77,9 +79,22 @@ import org.apache.fluo.api.data.Column;
  *     <tr> <td>Node ID</td> <td>statementPatternMetadata:variableOrder</td> 
<td>The Variable Order binding sets are emitted with.</td> </tr>
  *     <tr> <td>Node ID</td> <td>statementPatternMetadata:pattern</td> <td>The 
pattern that defines which Statements will be matched.</td> </tr>
  *     <tr> <td>Node ID</td> <td>statementPatternMetadata:parentNodeId</td> 
<td>The Node ID this statement pattern emits Binding Sets to.</td> </tr>
- *     <tr> <td>Node ID + DELIM + Binding set String</td> 
<td>statementPatternMetadata:bindingSet</td> <td>A Binding Set that matches the 
Statement Pattern.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> 
<td>statementPatternMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} 
object.</td> </tr>
  *   </table>
+ * </p>
  * <p>
+ *   <b>Aggregation Metadata</b>
+ *   <table border="1" style="width:100%">
+ *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:nodeId</td> <td>The Node 
ID of the Statement Pattern.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:variableOrder</td> 
<td>The Variable Order binding sets are emitted with.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:parentNodeId</td> <td>The 
Node ID this Aggregation emits its result Binding Set to.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:childNodeId</td> <td>The 
Node ID of the node that feeds this node Binding Sets.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:groupByBindingNames</td> 
<td>An ordered list of the binding names the aggregation's results will be 
grouped by.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>aggregationMetadata:aggregations</td> <td>A 
serialized form of the aggregations that need to be performed by this 
aggregation node.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Group By Values Binding Set String</td> 
<td>aggregationMetadata:bindingSet</td><td>An {@link AggregationState} 
object.</td> </tr>
+ *   </table>
+ * </p>
  */
 public class FluoQueryColumns {
 
@@ -88,6 +103,7 @@ public class FluoQueryColumns {
     public static final String FILTER_METADATA_CF = "filterMetadata";
     public static final String JOIN_METADATA_CF = "joinMetadata";
     public static final String STATEMENT_PATTERN_METADATA_CF = 
"statementPatternMetadata";
+    public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
 
     /**
      * New triples that have been added to Rya are written as a row in this
@@ -96,7 +112,7 @@ public class FluoQueryColumns {
      * <p>
      *   <table border="1" style="width:100%">
      *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> 
</tr>
-     *     <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> 
<td>visibility</td> </tr>
+     *     <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> 
<td>The visibility label for the triple.</td> </tr>
      *   </table>
      * </p>
      */
@@ -108,7 +124,7 @@ public class FluoQueryColumns {
      * <p>
      *   <table border="1" style="width:100%">
      *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> 
</tr>
-     *     <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which 
PCJ the reuslts of this query will be exported to.</td> </tr>
+     *     <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which 
PCJ the results of this query will be exported to.</td> </tr>
      *   </table>
      * </p>
      */
@@ -160,6 +176,15 @@ public class FluoQueryColumns {
     public static final Column STATEMENT_PATTERN_PARENT_NODE_ID = new 
Column(STATEMENT_PATTERN_METADATA_CF, "parentNodeId");
     public static final Column STATEMENT_PATTERN_BINDING_SET = new 
Column(STATEMENT_PATTERN_METADATA_CF, "bindingSet");
 
+    // Aggregation Metadata columns.
+    public static final Column AGGREGATION_NODE_ID = new 
Column(AGGREGATION_METADATA_CF, "nodeId");
+    public static final Column AGGREGATION_VARIABLE_ORDER = new 
Column(AGGREGATION_METADATA_CF, "variableOrder");
+    public static final Column AGGREGATION_PARENT_NODE_ID = new 
Column(AGGREGATION_METADATA_CF, "parentNodeId");
+    public static final Column AGGREGATION_CHILD_NODE_ID = new 
Column(AGGREGATION_METADATA_CF, "childNodeId");
+    public static final Column AGGREGATION_GROUP_BY_BINDING_NAMES = new 
Column(AGGREGATION_METADATA_CF, "groupByBindingNames");
+    public static final Column AGGREGATION_AGGREGATIONS = new 
Column(AGGREGATION_METADATA_CF, "aggregations");
+    public static final Column AGGREGATION_BINDING_SET = new 
Column(AGGREGATION_METADATA_CF, "bindingSet");
+
     /**
      * Enumerates the {@link Column}s that hold all of the fields for each type
      * of node that can compose a query.
@@ -204,16 +229,27 @@ public class FluoQueryColumns {
                 Arrays.asList(STATEMENT_PATTERN_NODE_ID,
                         STATEMENT_PATTERN_VARIABLE_ORDER,
                         STATEMENT_PATTERN_PATTERN,
-                        STATEMENT_PATTERN_PARENT_NODE_ID));
+                        STATEMENT_PATTERN_PARENT_NODE_ID)),
+
+        /**
+         * The columns an {@link AggregationMetadata} object's fields are 
stored within.
+         */
+        AGGREGATION_COLUMNS(
+                Arrays.asList(AGGREGATION_NODE_ID,
+                        AGGREGATION_VARIABLE_ORDER,
+                        AGGREGATION_PARENT_NODE_ID,
+                        AGGREGATION_CHILD_NODE_ID,
+                        AGGREGATION_GROUP_BY_BINDING_NAMES,
+                        AGGREGATION_AGGREGATIONS));
 
-        private List<Column> columns;
+        private final List<Column> columns;
 
         /**
          * Constructs an instance of {@link QueryNodeMetadataColumns}.
          *
          * @param columns - The {@link Column}s associated with this node's 
metadata. (not null)
          */
-        private QueryNodeMetadataColumns(List<Column> columns) {
+        private QueryNodeMetadataColumns(final List<Column> columns) {
             this.columns = requireNonNull(columns);
         }
 
@@ -224,4 +260,4 @@ public class FluoQueryColumns {
             return columns;
         }
     }
-}
+}
\ No newline at end of file

Reply via email to