http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
new file mode 100644
index 0000000..2e41cb1
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.DecimalLiteralImpl;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Updates the results of an Aggregate node when its child has added a new 
Binding Set to its results.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationResultUpdater {
+    private static final Logger log = 
Logger.getLogger(AggregationResultUpdater.class);
+
+    private static final AggregationStateSerDe AGG_STATE_SERDE = new 
ObjectSerializationAggregationStateSerDe();
+
+    private static final ImmutableMap<AggregationType, AggregationFunction> 
FUNCTIONS;
+    static {
+        final ImmutableMap.Builder<AggregationType, AggregationFunction> 
builder = ImmutableMap.builder();
+        builder.put(AggregationType.COUNT, new CountFunction());
+        builder.put(AggregationType.SUM, new SumFunction());
+        builder.put(AggregationType.AVERAGE, new AverageFunction());
+        builder.put(AggregationType.MIN, new MinFunction());
+        builder.put(AggregationType.MAX, new MaxFunction());
+        FUNCTIONS = builder.build();
+    }
+
+    /**
+     * Updates the results of an Aggregation node where its child has emitted 
a new Binding Set.
+     *
+     * @param tx - The transaction all Fluo queries will use. (not null)
+     * @param childBindingSet - The Binding Set that was omitted by the 
Aggregation Node's child. (not null)
+     * @param aggregationMetadata - The metadata of the Aggregation node whose 
results will be updated. (not null)
+     * @throws Exception The update could not be successfully performed.
+     */
+    public void updateAggregateResults(
+            final TransactionBase tx,
+            final VisibilityBindingSet childBindingSet,
+            final AggregationMetadata aggregationMetadata) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(childBindingSet);
+        requireNonNull(aggregationMetadata);
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Child Binding Set:\n" + childBindingSet + "\n");
+
+        // The Row ID for the Aggregation State that needs to be updated is 
defined by the Group By variables.
+        final String aggregationNodeId = aggregationMetadata.getNodeId();
+        final VariableOrder groupByVars = 
aggregationMetadata.getGroupByVariableOrder();
+        final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, 
groupByVars, childBindingSet);
+
+        // Load the old state from the bytes if one was found; otherwise 
initialize the state.
+        final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, 
FluoQueryColumns.AGGREGATION_BINDING_SET) );
+
+        final AggregationState state;
+        if(stateBytes.isPresent()) {
+            // Deserialize the old state
+            final byte[] bytes = stateBytes.get().toArray();
+            state = AGG_STATE_SERDE.deserialize(bytes);
+        } else {
+            // Initialize a new state.
+            state = new AggregationState();
+
+            // If we have group by bindings, their values need to be added to 
the state's binding set.
+            final MapBindingSet bindingSet = state.getBindingSet();
+            for(final String variable : 
aggregationMetadata.getGroupByVariableOrder()) {
+                bindingSet.addBinding( childBindingSet.getBinding(variable) );
+            }
+        }
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Before Update: " + state.getBindingSet().toString() + "\n");
+
+        // Update the visibilities of the result binding set based on the 
child's visibilities.
+        final String oldVisibility = state.getVisibility();
+        final String updateVisibilities = 
VisibilitySimplifier.unionAndSimplify(oldVisibility, 
childBindingSet.getVisibility());
+        state.setVisibility(updateVisibilities);
+
+        // Update the Aggregation State with each Aggregation function 
included within this group.
+        for(final AggregationElement aggregation : 
aggregationMetadata.getAggregations()) {
+            final AggregationType type = aggregation.getAggregationType();
+            final AggregationFunction function = FUNCTIONS.get(type);
+            if(function == null) {
+                throw new RuntimeException("Unrecognized aggregation function: 
" + type);
+            }
+
+            function.update(aggregation, state, childBindingSet);
+        }
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "After Update:" + state.getBindingSet().toString() + "\n" );
+
+        // Store the updated state. This will write on top of any old state 
that was present for the Group By values.
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET, 
Bytes.of(AGG_STATE_SERDE.serialize(state)));
+    }
+
+    /**
+     * A function that updates an {@link AggregationState}.
+     */
+    public static interface AggregationFunction {
+
+        /**
+         * Updates an {@link AggregationState} based on the values of a child 
Binding Set.
+         *
+         * @param aggregation - Defines which function needs to be performed 
as well as any details required
+         *   to do the aggregation work. (not null)
+         * @param state - The state that will be updated. (not null)
+         * @param childBindingSet - The Binding Set whose values will be used 
to update the state.
+         */
+        public void update(AggregationElement aggregation, AggregationState 
state, VisibilityBindingSet childBindingSet);
+    }
+
+    /**
+     * Increments the {@link AggregationState}'s count if the child Binding 
Set contains the binding name
+     * that is being counted by the {@link AggregationElement}.
+     */
+    public static final class CountFunction implements AggregationFunction {
+        @Override
+        public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+            checkArgument(aggregation.getAggregationType() == 
AggregationType.COUNT, "The CountFunction only accepts COUNT 
AggregationElements.");
+
+            // Only add one to the count if the child contains the binding 
that we are counting.
+            final String aggregatedName = 
aggregation.getAggregatedBindingName();
+            if(childBindingSet.hasBinding(aggregatedName)) {
+                final MapBindingSet result = state.getBindingSet();
+                final String resultName = aggregation.getResultBindingName();
+                final boolean newBinding = !result.hasBinding(resultName);
+
+                if(newBinding) {
+                    // Initialize the binding.
+                    result.addBinding(resultName, new 
IntegerLiteralImpl(BigInteger.ONE));
+                } else {
+                    // Update the existing binding.
+                    final Literal count = (Literal) 
result.getValue(resultName);
+                    final BigInteger updatedCount = count.integerValue().add( 
BigInteger.ONE );
+                    result.addBinding(resultName, new 
IntegerLiteralImpl(updatedCount));
+                }
+            }
+        }
+    }
+
+    /**
+     * Add to the {@link AggregationState}'s sum if the child Binding Set 
contains the binding name
+     * that is being summed by the {@link AggregationElement}.
+     */
+    public static final class SumFunction implements AggregationFunction {
+        @Override
+        public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+            checkArgument(aggregation.getAggregationType() == 
AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
+
+            // Only add values to the sum if the child contains the binding 
that we are summing.
+            final String aggregatedName = 
aggregation.getAggregatedBindingName();
+            if(childBindingSet.hasBinding(aggregatedName)) {
+                final MapBindingSet result = state.getBindingSet();
+                final String resultName = aggregation.getResultBindingName();
+                final boolean newBinding = !result.hasBinding(resultName);
+
+                // Get the starting number for the sum.
+                Literal sum;
+                if(newBinding) {
+                    sum = new IntegerLiteralImpl(BigInteger.ZERO);
+                } else {
+                    sum = (Literal) state.getBindingSet().getValue(resultName);
+                }
+
+                // Add the child binding set's value if it is a numeric 
literal.
+                final Value childValue = 
childBindingSet.getValue(aggregatedName);
+                if(childValue instanceof Literal) {
+                    final Literal childLiteral = (Literal) childValue;
+                    if (childLiteral.getDatatype() != null && 
XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+                        try {
+                            sum = MathUtil.compute(sum, childLiteral, 
MathOp.PLUS);
+                        } catch (final ValueExprEvaluationException e) {
+                            log.error("A problem was encountered while 
updating a Sum Aggregation. This binding set will be ignored: " + 
childBindingSet);
+                            return;
+                        }
+                    }
+                }
+
+                // Update the state to include the new sum.
+                result.addBinding(resultName, sum);
+            }
+        }
+    }
+
+    /**
+     * Update the {@link AggregationState}'s average if the child Binding Set 
contains the binding name
+     * that is being averaged by the {@link AggregationElement}.
+     */
+    public static final class AverageFunction implements AggregationFunction {
+        @Override
+        public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+            checkArgument(aggregation.getAggregationType() == 
AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE 
AggregationElements.");
+
+            // Only update the average if the child contains the binding that 
we are averaging.
+            final String aggregatedName = 
aggregation.getAggregatedBindingName();
+            if(childBindingSet.hasBinding(aggregatedName)) {
+                final MapBindingSet result = state.getBindingSet();
+                final String resultName = aggregation.getResultBindingName();
+                final boolean newBinding = !result.hasBinding(resultName);
+
+                // Get the state of the average.
+                final Map<String, AverageState> averageStates = 
state.getAverageStates();
+                AverageState averageState = newBinding ? new AverageState() : 
averageStates.get(resultName);
+
+                // Update the state of the average.
+                final Value childValue = 
childBindingSet.getValue(aggregatedName);
+                if(childValue instanceof Literal) {
+                    final Literal childLiteral = (Literal) childValue;
+                    if (childLiteral.getDatatype() != null && 
XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+                        try {
+                            // Update the sum.
+                            final Literal oldSum = new 
DecimalLiteralImpl(averageState.getSum());
+                            final BigDecimal sum = MathUtil.compute(oldSum, 
childLiteral, MathOp.PLUS).decimalValue();
+
+                            // Update the count.
+                            final BigInteger count = 
averageState.getCount().add( BigInteger.ONE );
+
+                            // Update the BindingSet to include the new 
average.
+                            final Literal sumLiteral = new 
DecimalLiteralImpl(sum);
+                            final Literal countLiteral = new 
IntegerLiteralImpl(count);
+                            final Literal average = 
MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE);
+                            result.addBinding(resultName, average);
+
+                            // Update the average state that is stored.
+                            averageState = new AverageState(sum, count);
+                            averageStates.put(resultName, averageState);
+                        } catch (final ValueExprEvaluationException e) {
+                            log.error("A problem was encountered while 
updating an Average Aggregation. This binding set will be ignored: " + 
childBindingSet);
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Update the {@link AggregationState}'s max if the child binding Set 
contains the binding name that is being
+     * maxed by the {@link AggregationElement}.
+     */
+    public static final class MaxFunction implements AggregationFunction {
+
+        private final ValueComparator compare = new ValueComparator();
+
+        @Override
+        public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+            checkArgument(aggregation.getAggregationType() == 
AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
+
+            // Only update the max if the child contains the binding that we 
are finding the max value for.
+            final String aggregatedName = 
aggregation.getAggregatedBindingName();
+            if(childBindingSet.hasBinding(aggregatedName)) {
+                final MapBindingSet result = state.getBindingSet();
+                final String resultName = aggregation.getResultBindingName();
+                final boolean newBinding = !result.hasBinding(resultName);
+
+                Value max;
+                if(newBinding) {
+                    max = childBindingSet.getValue(aggregatedName);
+                } else {
+                    final Value oldMax = result.getValue(resultName);
+                    final Value childMax = 
childBindingSet.getValue(aggregatedName);
+                    max = compare.compare(childMax, oldMax) > 0 ? childMax : 
oldMax;
+                }
+
+                result.addBinding(resultName, max);
+            }
+        }
+    }
+
+    /**
+     * Update the {@link AggregationState}'s min if the child binding Set 
contains the binding name that is being
+     * mined by the {@link AggregationElement}.
+     */
+    public static final class MinFunction implements AggregationFunction {
+
+        private final ValueComparator compare = new ValueComparator();
+
+        @Override
+        public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+            checkArgument(aggregation.getAggregationType() == 
AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
+
+            // Only update the min if the child contains the binding that we 
are finding the min value for.
+            final String aggregatedName = 
aggregation.getAggregatedBindingName();
+            if(childBindingSet.hasBinding(aggregatedName)) {
+                final MapBindingSet result = state.getBindingSet();
+                final String resultName = aggregation.getResultBindingName();
+                final boolean newBinding = !result.hasBinding(resultName);
+
+                Value min;
+                if(newBinding) {
+                    min = childBindingSet.getValue(aggregatedName);
+                } else {
+                    final Value oldMin = result.getValue(resultName);
+                    final Value chidlMin = 
childBindingSet.getValue(aggregatedName);
+                    min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : 
oldMin;
+                }
+
+                result.addBinding(resultName, min);
+            }
+        }
+    }
+
+    /**
+     * Reads/Writes instances of {@link AggregationState} to/from bytes.
+     */
+    public static interface AggregationStateSerDe {
+
+        /**
+         * @param state - The state that will be serialized. (not null)
+         * @return The state serialized to a byte[].
+         */
+        public byte[] serialize(AggregationState state);
+
+        /**
+         * @param bytes - The bytes that will be deserialized. (not null)
+         * @return The {@link AggregationState} that was read from the bytes.
+         */
+        public AggregationState deserialize(byte[] bytes);
+    }
+
+    /**
+     * An implementation of {@link AggregationStateSerDe} that uses Java 
Serialization.
+     */
+    public static final class ObjectSerializationAggregationStateSerDe 
implements AggregationStateSerDe {
+
+        @Override
+        public byte[] serialize(final AggregationState state) {
+            requireNonNull(state);
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try(final ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                oos.writeObject(state);
+            } catch (final IOException e) {
+                throw new RuntimeException("A problem was encountered while 
serializing an AggregationState object.", e);
+            }
+
+            return baos.toByteArray();
+        }
+
+        @Override
+        public AggregationState deserialize(final byte[] bytes) {
+            requireNonNull(bytes);
+
+            final AggregationState state;
+
+            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            try(ObjectInputStream ois = new ObjectInputStream(bais)) {
+                final Object o = ois.readObject();
+                if(o instanceof AggregationState) {
+                    state = (AggregationState)o;
+                } else {
+                    throw new RuntimeException("A problem was encountered 
while deserializing an AggregationState object. Wrong class.");
+                }
+            } catch (final IOException | ClassNotFoundException e) {
+                throw new RuntimeException("A problem was encountered while 
deserializing an AggregationState object.", e);
+            }
+
+            return state;
+        }
+    }
+
+    /**
+     * Keeps track information required to update and build the resulting 
Binding Set for a set of Group By values.
+     */
+    public static final class AggregationState implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        // The visibility equation that encompasses all data the aggregation 
state is derived from.
+        private String visibility;
+
+        // A binding set that holds the current state of the aggregations.
+        private final MapBindingSet bindingSet;
+
+        // A map from result binding name to the state that derived that 
binding's value.
+        private final Map<String, AverageState> avgStates;
+
+        /**
+         * Constructs an instance of {@link AggregationState}.
+         */
+        public AggregationState() {
+            this.visibility = "";
+            this.bindingSet = new MapBindingSet();
+            this.avgStates = new HashMap<>();
+        }
+
+        /**
+         * Constructs an instance of {@link AggregationState}.
+         *
+         * @param visibility - The visibility equation associated with the 
resulting binding set. (not null)
+         * @param bindingSet - The Binding Set whose values are being updated. 
It holds the result for a set of
+         *   Group By values. (not null)
+         * @param avgStates - If the aggregation is doing an Average, this is 
a map from result binding name to
+         *   average state for that binding.
+         */
+        public AggregationState(
+                final String visibility,
+                final MapBindingSet bindingSet,
+                final Map<String, AverageState> avgStates) {
+            this.visibility = requireNonNull(visibility);
+            this.bindingSet = requireNonNull(bindingSet);
+            this.avgStates = requireNonNull(avgStates);
+        }
+
+        /**
+         * @return The visibility equation associated with the resulting 
binding set.
+         */
+        public String getVisibility() {
+            return visibility;
+        }
+
+        /**
+         * @param visibility - The visibility equation associated with the 
resulting binding set.
+         */
+        public void setVisibility(final String visibility) {
+            this.visibility = requireNonNull(visibility);
+        }
+
+        /**
+         * @return The Binding Set whose values are being updated. It holds 
the result for a set of Group By values.
+         */
+        public MapBindingSet getBindingSet() {
+            return bindingSet;
+        }
+
+        /**
+         * @return If the aggregation is doing an Average, this is a map from 
result binding name to
+         *   average state for that binding.
+         */
+        public Map<String, AverageState> getAverageStates() {
+            return avgStates;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(visibility, bindingSet, avgStates);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if(o instanceof AggregationState) {
+                final AggregationState state = (AggregationState) o;
+                return Objects.equals(visibility, state.visibility) &&
+                        Objects.equals(bindingSet, state.bindingSet) &&
+                        Objects.equals(avgStates, state.avgStates);
+            }
+            return false;
+        }
+    }
+
+    /**
+     * The Sum and Count of the values that are being averaged. The average 
itself is derived from these values.
+     */
+    public static class AverageState implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final BigDecimal sum;
+        private final BigInteger count;
+
+        /**
+         * Constructs an instance of {@link AverageState} where the count and 
sum start at 0.
+         */
+        public AverageState() {
+            sum = BigDecimal.ZERO;
+            count = BigInteger.ZERO;
+        }
+
+        /**
+         * Constructs an instance of {@link AverageState}.
+         *
+         * @param sum - The sum of the values that are averaged. (not null)
+         * @param count - The number of values that are averaged. (not null)
+         */
+        public AverageState(final BigDecimal sum, final BigInteger count) {
+            this.sum = requireNonNull(sum);
+            this.count = requireNonNull(count);
+        }
+
+        /**
+         * @return The sum of the values that are averaged.
+         */
+        public BigDecimal getSum() {
+            return sum;
+        }
+
+        /**
+         * @return The number of values that are averaged.
+         */
+        public BigInteger getCount() {
+            return count;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(sum, count);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if(o instanceof AverageState) {
+                final AverageState state = (AverageState) o;
+                return Objects.equals(sum, state.sum) &&
+                        Objects.equals(count, state.count);
+            }
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "Sum: " + sum + " Count: " + count;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
index 8c8505d..2e45ea6 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -21,12 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
+import org.apache.fluo.api.data.Bytes;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import net.jcip.annotations.Immutable;
 
-import org.apache.fluo.api.data.Bytes;
-
 /**
  * The values of an Accumulo Row ID for a row that stores a Binding set for
  * a specific Node ID of a query.
@@ -73,13 +73,8 @@ public class BindingSetRow {
 
         // Read the Node ID from the row's bytes.
         final String[] rowArray = row.toString().split(NODEID_BS_DELIM);
-        if(rowArray.length != 2) {
-            throw new IllegalArgumentException("A row must contain a single 
NODEID_BS_DELIM.");
-        }
-
         final String nodeId = rowArray[0];
-        String bindingSetString = rowArray[1];
-
+        final String bindingSetString = rowArray.length == 2 ? rowArray[1] : 
"";
         return new BindingSetRow(nodeId, bindingSetString);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 3b17a33..42ec686 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -19,24 +19,22 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.algebra.Filter;
@@ -45,14 +43,12 @@ import org.openrdf.query.algebra.evaluation.TripleSource;
 import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
 import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
 import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil;
-import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Optional;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 import info.aduna.iteration.CloseableIteration;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
 
 /**
  * Updates the results of a Filter node when its child has added a new Binding
@@ -61,8 +57,9 @@ import org.apache.fluo.api.data.Column;
 @DefaultAnnotation(NonNull.class)
 public class FilterResultUpdater {
 
-    private static final BindingSetStringConverter ID_CONVERTER = new 
BindingSetStringConverter();
-    private static final VisibilityBindingSetStringConverter VALUE_CONVERTER = 
new VisibilityBindingSetStringConverter();
+    private static final Logger log = 
Logger.getLogger(FilterResultUpdater.class);
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     /**
      * A utility class used to search SPARQL queries for Filters.
@@ -96,7 +93,7 @@ public class FilterResultUpdater {
      * new Binding Set to its results.
      *
      * @param tx - The transaction all Fluo queries will use. (not null)
-     * @param childBindingSet - A binding set that the query's child node has 
emmitted. (not null)
+     * @param childBindingSet - A binding set that the query's child node has 
emitted. (not null)
      * @param filterMetadata - The metadata of the Filter whose results will 
be updated. (not null)
      * @throws Exception Something caused the update to fail.
      */
@@ -108,6 +105,11 @@ public class FilterResultUpdater {
         checkNotNull(childBindingSet);
         checkNotNull(filterMetadata);
 
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Filter Node ID: " + filterMetadata.getNodeId() + "\n" +
+                "Binding Set:\n" + childBindingSet + "\n");
+
         // Parse the original query and find the Filter that represents 
filterId.
         final String sparql = filterMetadata.getOriginalSparql();
         final int indexWithinQuery = 
filterMetadata.getFilterIndexWithinSparql();
@@ -118,23 +120,22 @@ public class FilterResultUpdater {
         if (isTrue(condition, childBindingSet)) {
             // Create the Filter's binding set from the child's.
             final VariableOrder filterVarOrder = 
filterMetadata.getVariableOrder();
+            final BindingSet filterBindingSet = 
BindingSetUtil.keepBindings(filterVarOrder, childBindingSet);
 
-            final MapBindingSet filterBindingSet = new MapBindingSet();
-            for(final String bindingName : filterVarOrder) {
-                if(childBindingSet.hasBinding(bindingName)) {
-                    final Binding binding = 
childBindingSet.getBinding(bindingName);
-                    filterBindingSet.addBinding(binding);
-                }
-            }
+            // Create the Row Key for the emitted binding set. It does not 
contain visibilities.
+            final Bytes resultRow = 
RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, 
filterBindingSet);
+
+            // If this is a new binding set, then emit it.
+            if(tx.get(resultRow, FluoQueryColumns.FILTER_BINDING_SET) == null) 
{
+                final VisibilityBindingSet visBindingSet = new 
VisibilityBindingSet(filterBindingSet, childBindingSet.getVisibility());
+                final Bytes nodeValueBytes = BS_SERDE.serialize(visBindingSet);
 
-            final String filterBindingSetIdString = 
ID_CONVERTER.convert(filterBindingSet, filterVarOrder);
-            String filterBindingSetValueString = "";
-            filterBindingSetValueString = 
VALUE_CONVERTER.convert(childBindingSet, filterVarOrder);
+                log.trace(
+                        "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                        "New Binding Set: " + visBindingSet + "\n");
 
-            final String row = filterMetadata.getNodeId() + NODEID_BS_DELIM + 
filterBindingSetIdString;
-            final Column col = FluoQueryColumns.FILTER_BINDING_SET;
-            final String value = filterBindingSetValueString;
-            tx.set(row, col, value);
+                tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, 
nodeValueBytes);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
index b78562c..602fd9d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
@@ -25,6 +25,7 @@ import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.UR
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.data.Bytes;
@@ -53,7 +54,7 @@ public class IncUpdateDAO {
         return rs;
     }
 
-    private static String getTripleString(final RyaStatement rs) {
+    public static String getTripleString(final RyaStatement rs) {
         final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE;
         final String pred = rs.getPredicate().getData() + TYPE_DELIM + 
URI_TYPE;
         final String objData = rs.getObject().getData();
@@ -102,81 +103,41 @@ public class IncUpdateDAO {
      */
     public static void printTriples(final FluoClient fluoClient) throws 
Exception {
         try (Snapshot snapshot = fluoClient.newSnapshot()) {
-               CellScanner cscanner = snapshot.scanner().fetch(new 
Column("triples", "SPO")).build();
-               for (RowColumnValue rcv : cscanner) {
+               final CellScanner cscanner = snapshot.scanner().fetch(new 
Column("triples", "SPO")).build();
+               for (final RowColumnValue rcv : cscanner) {
                        System.out.println("Triple: "+rcv.getsRow());
                        }
         }
     }
 
-//    /**
-//     * Print all bindings for the given queries. For demo's and diagnostics.
-//     * @param fluoClient
-//     * @param queryNames
-//     * @throws Exception
-//     */
-//    public static void printQueryResults(final FluoClient fluoClient,
-//            final Map<String, String> queryNames) throws Exception {
-//        try (Snapshot snapshot = fluoClient.newSnapshot();
-//                TypedTransaction tx1 = 
stl.wrap(fluoClient.newTransaction())) {
-//
-//            final ScannerConfiguration scanConfig = new 
ScannerConfiguration();
-//            scanConfig.fetchColumn(Bytes.of("query"), 
Bytes.of("bindingSet"));
-//
-//            final RowIterator rowIter = snapshot.get(scanConfig);
-//            String sparqlRow = "";
-//            
System.out.println("*********************************************************");
-//
-//            while (rowIter.hasNext()) {
-//                final Entry<Bytes, ColumnIterator> row = rowIter.next();
-//                final String[] joinInfo = row.getKey().toString()
-//                        .split(NODEID_BS_DELIM);
-//                final String sparql = joinInfo[0];
-//                final String bs = joinInfo[1];
-//                if (!sparqlRow.equals(sparql)) {
-//                    sparqlRow = sparql;
-//                    System.out.println();
-//                    System.out.println();
-//                    System.out.println(queryNames.get(sparqlRow)
-//                            + " has bindings: ");
-//                    System.out.println();
-//                }
-//
-//                final String variables = 
tx1.get().row(sparqlRow).col(NODE_VARS).toString();
-//                final String[] vars = variables.split(";");
-//                final String[] bsVals = bs.split(DELIM);
-//                System.out.print("Bindingset:  ");
-//                for (int i = 0; i < vars.length; i++) {
-//                    System.out.print(vars[i] + " = " + bsVals[i] + "   ");
-//                }
-//                System.out.println();
-//
-//            }
-//
-//            
System.out.println("*********************************************************");
-//        }
-//    }
+    /**
+     * Print all rows in the Fluo table for diagnostics.
+     * </p>
+     * Consider using {@code FluoITHelper.printFluoTable(FluoClient client)} 
instead.
+     */
+    @Deprecated
+    public static void printAll(final SnapshotBase sx) {
+        final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n";
+        System.out.println("Printing all tables.  Showing unprintable bytes 
and braces as {ff} and {{} and {}} where ff is the value in hexadecimal.");
+        System.out.format(FORMAT, "--Row--", "--Column Family--", "--Column 
Qual--", "--Value--");
+        final CellScanner cscanner = sx.scanner().build();
+        for (final RowColumnValue rcv : cscanner) {
+            System.out.format(FORMAT, to_String(rcv.getRow()),
+                    to_String(rcv.getColumn().getFamily()),
+                    to_String(rcv.getColumn().getQualifier()),
+                    to_String(rcv.getValue()));
+        }
+    }
 
     /**
      * Print all rows in the Fluo table for diagnostics.
-     * @param fluoClient
-     * @throws Exception
+     * </p>
+     * Consider using {@code FluoITHelper.printFluoTable(FluoClient client)} 
instead.
      */
+    @Deprecated
     public static void printAll(final FluoClient fluoClient) throws Exception {
-        final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n";
-        System.out
-        .println("Printing all tables.  Showing unprintable bytes and braces 
as {ff} and {{} and {}} where ff is the value in hexadecimal.");
-        System.out.format(FORMAT, "--Row--", "--Column Family--",
-                "--Column Qual--", "--Value--");
-        // Use try with resource to ensure snapshot is closed.
-        try (Snapshot snapshot = fluoClient.newSnapshot()) {
-               CellScanner cscanner = snapshot.scanner().build();
-               for (RowColumnValue rcv : cscanner) {
-                       System.out.format(FORMAT, to_String(rcv.getRow()),
-                        to_String(rcv.getColumn().getFamily()),
-                        to_String(rcv.getColumn().getQualifier()),
-                        to_String(rcv.getValue()));
-                       }
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            printAll(sx);
         }
     }
 
@@ -208,4 +169,4 @@ public class IncUpdateDAO {
         }
         return sb.toString();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 84581ef..be4df71 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -31,6 +31,7 @@ public class IncrementalUpdateConstants {
     public static final String SP_PREFIX = "STATEMENT_PATTERN";
     public static final String JOIN_PREFIX = "JOIN";
     public static final String FILTER_PREFIX = "FILTER";
+    public static final String AGGREGATION_PREFIX = "AGGREGATION";
     public static final String QUERY_PREFIX = "QUERY";
 
     public static final String URI_TYPE = 
"http://www.w3.org/2001/XMLSchema#anyURI";;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 39dcc16..2cb5a54 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -27,14 +27,20 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
@@ -42,17 +48,12 @@ import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Updates the results of a Join node when one of its children has added a
@@ -61,49 +62,57 @@ import org.apache.fluo.api.data.Span;
 @DefaultAnnotation(NonNull.class)
 public class JoinResultUpdater {
 
-    private static final BindingSetStringConverter idConverter = new 
BindingSetStringConverter();
-    private static final VisibilityBindingSetStringConverter valueConverter = 
new VisibilityBindingSetStringConverter();
+    private static final Logger log = 
Logger.getLogger(JoinResultUpdater.class);
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
+    private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER 
= new VisibilityBindingSetStringConverter();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-   
+
     /**
      * Updates the results of a Join node when one of its children has added a
      * new Binding Set to its results.
      *
      * @param tx - The transaction all Fluo queries will use. (not null)
-     * @param childId - The Node ID of the child whose results received a new 
Binding Set. (not null)
+     * @param childNodeId - The Node ID of the child whose results received a 
new Binding Set. (not null)
      * @param childBindingSet - The Binding Set that was just emitted by child 
node. (not null)
      * @param joinMetadata - The metadata for the Join that has been notified. 
(not null)
-     * @throws BindingSetConversionException
+     * @throws Exception The update could not be successfully performed.
      */
     public void updateJoinResults(
             final TransactionBase tx,
-            final String childId,
+            final String childNodeId,
             final VisibilityBindingSet childBindingSet,
-            final JoinMetadata joinMetadata) throws 
BindingSetConversionException {
+            final JoinMetadata joinMetadata) throws Exception {
         checkNotNull(tx);
-        checkNotNull(childId);
+        checkNotNull(childNodeId);
         checkNotNull(childBindingSet);
         checkNotNull(joinMetadata);
 
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Join Node ID: " + joinMetadata.getNodeId() + "\n" +
+                "Child Node ID: " + childNodeId + "\n" +
+                "Child Binding Set:\n" + childBindingSet + "\n");
+
         // Figure out which join algorithm we are going to use.
         final IterativeJoin joinAlgorithm;
         switch(joinMetadata.getJoinType()) {
-            case NATURAL_JOIN:
-                joinAlgorithm = new NaturalJoin();
-                break;
-            case LEFT_OUTER_JOIN:
-                joinAlgorithm = new LeftOuterJoin();
-                break;
-            default:
-                throw new RuntimeException("Unsupported JoinType: " + 
joinMetadata.getJoinType());
+        case NATURAL_JOIN:
+            joinAlgorithm = new NaturalJoin();
+            break;
+        case LEFT_OUTER_JOIN:
+            joinAlgorithm = new LeftOuterJoin();
+            break;
+        default:
+            throw new RuntimeException("Unsupported JoinType: " + 
joinMetadata.getJoinType());
         }
 
         // Figure out which side of the join the new binding set appeared on.
         final Side emittingSide;
         final String siblingId;
 
-        if(childId.equals(joinMetadata.getLeftChildNodeId())) {
+        if(childNodeId.equals(joinMetadata.getLeftChildNodeId())) {
             emittingSide = Side.LEFT;
             siblingId = joinMetadata.getRightChildNodeId();
         } else {
@@ -112,7 +121,7 @@ public class JoinResultUpdater {
         }
 
         // Iterates over the sibling node's BindingSets that join with the new 
binding set.
-        final FluoTableIterator siblingBindingSets = 
makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
+        final FluoTableIterator siblingBindingSets = 
makeSiblingScanIterator(childNodeId, childBindingSet, siblingId, tx);
 
         // Iterates over the resulting BindingSets from the join.
         final Iterator<VisibilityBindingSet> newJoinResults;
@@ -125,14 +134,22 @@ public class JoinResultUpdater {
         // Insert the new join binding sets to the Fluo table.
         final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
         while(newJoinResults.hasNext()) {
-            final BindingSet newJoinResult = newJoinResults.next();
-            final String joinBindingSetStringId = 
idConverter.convert(newJoinResult, joinVarOrder);
-            final String joinBindingSetStringValue = 
valueConverter.convert(newJoinResult, joinVarOrder);
-
-            final String row = joinMetadata.getNodeId() + NODEID_BS_DELIM + 
joinBindingSetStringId;
-            final Column col = FluoQueryColumns.JOIN_BINDING_SET;
-            final String value = joinBindingSetStringValue;
-            tx.set(row, col, value);
+            final VisibilityBindingSet newJoinResult = newJoinResults.next();
+
+            // Create the Row Key for the emitted binding set. It does not 
contain visibilities.
+            final Bytes resultRow = 
RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
+
+            // Only insert the join Binding Set if it is new.
+            if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null) {
+                // Create the Node Value. It does contain visibilities.
+                final Bytes nodeValueBytes = BS_SERDE.serialize(newJoinResult);
+
+                log.trace(
+                        "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                        "New Join Result:\n" + newJoinResult + "\n");
+
+                tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, 
nodeValueBytes);
+            }
         }
     }
 
@@ -150,8 +167,8 @@ public class JoinResultUpdater {
         final List<String> commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
 
         // Get the Binding strings
-        final String childBindingSetString = 
valueConverter.convert(childBindingSet, childVarOrder);
-        String[] childBindingArray = childBindingSetString.split("\u0001");
+        final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
+        final String[] childBindingArray = 
childBindingSetString.split("\u0001");
         final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
 
         // Create the prefix that will be used to scan for binding sets of the 
sibling node.
@@ -174,7 +191,7 @@ public class JoinResultUpdater {
         // earlier iterations of this algorithm.
 
         final RowScanner rs = 
tx.scanner().over(Span.prefix(siblingScanPrefix)).fetch(getScanColumnFamily(siblingId)).byRow().build();
-        return new FluoTableIterator(rs, siblingVarOrder);
+        return new FluoTableIterator(rs);
     }
 
 
@@ -270,7 +287,7 @@ public class JoinResultUpdater {
             default:
                 throw new IllegalArgumentException("The child node's sibling 
is not of type StatementPattern, Join, Left Join, or Filter.");
         }
-        
+
         return column;
     }
 
@@ -426,14 +443,8 @@ public class JoinResultUpdater {
                 leftVisi = joinResult.getVisibility();
                 rightVisi = newResult.getVisibility();
             }
+            final String visibility = 
VisibilitySimplifier.unionAndSimplify(leftVisi, rightVisi);
 
-            String visibility = "";
-            final Joiner join = Joiner.on(")&(");
-            if(leftVisi.isEmpty() || rightVisi.isEmpty()) {
-                visibility = (leftVisi + rightVisi).trim();
-            } else {
-                visibility = "(" + join.join(leftVisi, rightVisi) + ")";
-            }
             return new VisibilityBindingSet(bs, visibility);
         }
 
@@ -449,24 +460,22 @@ public class JoinResultUpdater {
      */
     private static final class FluoTableIterator implements 
Iterator<VisibilityBindingSet> {
 
+        private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
+
         private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
                 FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
                 FluoQueryColumns.JOIN_BINDING_SET,
                 FluoQueryColumns.FILTER_BINDING_SET);
 
         private final Iterator<ColumnScanner> rows;
-        private final VariableOrder varOrder;
 
         /**
          * Constructs an instance of {@link FluoTableIterator}.
          *
          * @param rows - Iterates over RowId values in a Fluo Table. (not null)
-         * @param varOrder - The Variable Order of binding sets that will be
-         *   read from the Fluo Table. (not null)
          */
-        public FluoTableIterator(final RowScanner rows, final VariableOrder 
varOrder) {
+        public FluoTableIterator(final RowScanner rows) {
             this.rows = checkNotNull(rows).iterator();
-            this.varOrder = checkNotNull(varOrder);
         }
 
         @Override
@@ -478,12 +487,16 @@ public class JoinResultUpdater {
         public VisibilityBindingSet next() {
             final ColumnScanner columns = rows.next();
 
-            for (ColumnValue cv : columns) {
-                if(BINDING_SET_COLUMNS.contains(cv.getColumn())) {
-                     final String bindingSetString = cv.getsValue();
-                     return (VisibilityBindingSet) 
valueConverter.convert(bindingSetString, varOrder);
-                 }
-                       }
+            for (final ColumnValue cv : columns) {
+                if(BINDING_SET_COLUMNS.contains(cv.getColumn())) {
+                    final Bytes value = cv.getValue();
+                    try {
+                        return BS_SERDE.deserialize(value);
+                    } catch (final Exception e) {
+                        throw new RuntimeException("Row did not containing a 
Binding Set.", e);
+                    }
+                }
+            }
 
             throw new RuntimeException("Row did not containing a Binding 
Set.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index 0a5ecc1..5365e30 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -26,14 +27,13 @@ import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP
 
 import java.util.List;
 
+import org.apache.fluo.api.data.Column;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns;
 import org.openrdf.query.BindingSet;
 
 import com.google.common.base.Optional;
 
-import org.apache.fluo.api.data.Column;
-
 /**
  * Represents the different types of nodes that a Query may have.
  */
@@ -41,7 +41,8 @@ public enum NodeType {
     FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, 
FluoQueryColumns.FILTER_BINDING_SET),
     JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, 
FluoQueryColumns.JOIN_BINDING_SET),
     STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
-    QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, 
FluoQueryColumns.QUERY_BINDING_SET);
+    QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, 
FluoQueryColumns.QUERY_BINDING_SET),
+    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, 
FluoQueryColumns.AGGREGATION_BINDING_SET);
 
     //Metadata Columns associated with given NodeType
     private QueryNodeMetadataColumns metadataColumns;
@@ -55,7 +56,7 @@ public enum NodeType {
      * @param metadataColumns - Metadata {@link Column}s associated with this 
{@link NodeType}. (not null)
      * @param bindingSetColumn - The {@link Column} used to store this {@link 
NodeType|'s {@link BindingSet}s. (not null)
      */
-    private NodeType(QueryNodeMetadataColumns metadataColumns, Column 
bindingSetColumn) {
+    private NodeType(final QueryNodeMetadataColumns metadataColumns, final 
Column bindingSetColumn) {
        this.metadataColumns = requireNonNull(metadataColumns);
        this.bindingSetColumn = requireNonNull(bindingSetColumn);
     }
@@ -95,6 +96,8 @@ public enum NodeType {
             type = JOIN;
         } else if(nodeId.startsWith(QUERY_PREFIX)) {
             type = QUERY;
+        } else if(nodeId.startsWith(AGGREGATION_PREFIX)) {
+            type = AGGREGATION;
         }
 
         return Optional.fromNullable(type);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 9cd2bd7..ba82726 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -19,22 +19,22 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
-import org.openrdf.query.Binding;
-import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.query.BindingSet;
 
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Column;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Updates the results of a Query node when one of its children has added a
@@ -42,9 +42,10 @@ import org.apache.fluo.api.data.Column;
  */
 @DefaultAnnotation(NonNull.class)
 public class QueryResultUpdater {
-    
-    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
-    private final VisibilityBindingSetStringConverter valueConverter = new 
VisibilityBindingSetStringConverter();
+    private static final Logger log = 
Logger.getLogger(QueryResultUpdater.class);
+
+    private static final FluoQueryMetadataDAO METADATA_DA0 = new 
FluoQueryMetadataDAO();
+    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
     /**
      * Updates the results of a Query node when one of its children has added a
@@ -53,32 +54,46 @@ public class QueryResultUpdater {
      * @param tx - The transaction all Fluo queries will use. (not null)
      * @param childBindingSet - A binding set that the query's child node has 
emmitted. (not null)
      * @param queryMetadata - The metadata of the Query whose results will be 
updated. (not null)
+     * @throws Exception A problem caused the update to fail.
      */
     public void updateQueryResults(
             final TransactionBase tx,
             final VisibilityBindingSet childBindingSet,
-            final QueryMetadata queryMetadata) {
+            final QueryMetadata queryMetadata) throws Exception {
         checkNotNull(tx);
         checkNotNull(childBindingSet);
         checkNotNull(queryMetadata);
 
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Join Node ID: " + queryMetadata.getNodeId() + "\n" +
+                "Child Node ID: " + queryMetadata.getChildNodeId() + "\n" +
+                "Child Binding Set:\n" + childBindingSet + "\n");
+
         // Create the query's Binding Set from the child node's binding set.
         final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
+        final BindingSet queryBindingSet = 
BindingSetUtil.keepBindings(queryVarOrder, childBindingSet);
 
-        final MapBindingSet queryBindingSet = new MapBindingSet();
-        for(final String bindingName : queryVarOrder) {
-            if(childBindingSet.hasBinding(bindingName)) {
-                final Binding binding = 
childBindingSet.getBinding(bindingName);
-                queryBindingSet.addBinding(binding);
-            }
+        // Create the Row Key for the result. If the child node groups 
results, then the key must only contain the Group By variables.
+        final Bytes resultRow;
+
+        final String childNodeId = queryMetadata.getChildNodeId();
+        final boolean isGrouped = childNodeId.startsWith( 
IncrementalUpdateConstants.AGGREGATION_PREFIX );
+        if(isGrouped) {
+            final AggregationMetadata aggMetadata = 
METADATA_DA0.readAggregationMetadata(tx, childNodeId);
+            final VariableOrder groupByVars = 
aggMetadata.getGroupByVariableOrder();
+            resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), 
groupByVars, queryBindingSet);
+        } else {
+            resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), 
queryVarOrder, queryBindingSet);
         }
-        final String queryBindingSetString = 
converter.convert(queryBindingSet, queryVarOrder);
-        final String queryBindingSetValueString = valueConverter.convert(new 
VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), 
queryVarOrder);
 
-        // Commit it to the Fluo table for the SPARQL query. This isn't 
guaranteed to be a new entry.
-        final String row = queryMetadata.getNodeId() + NODEID_BS_DELIM + 
queryBindingSetString;
-        final Column col = FluoQueryColumns.QUERY_BINDING_SET;
-        final String value = queryBindingSetValueString;
-        tx.set(row, col, value);
+        // Create the Binding Set that goes in the Node Value. It does contain 
visibilities.
+        final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "New Binding Set: " + childBindingSet + "\n");
+
+        tx.set(resultRow, FluoQueryColumns.QUERY_BINDING_SET, nodeValueBytes);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
new file mode 100644
index 0000000..34439e4
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Serializes and deserializes a {@link VisibilityBindingSet} to and from 
{@link Bytes} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilityBindingSetSerDe {
+
+    /**
+     * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object.
+     *
+     * @param bindingSet - The binding set that will be serialized. (not null)
+     * @return The serialized object.
+     * @throws Exception A problem was encountered while serializing the 
object.
+     */
+    public Bytes serialize(final VisibilityBindingSet bindingSet) throws 
Exception {
+        requireNonNull(bindingSet);
+
+        final ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) {
+            oos.writeObject(bindingSet);
+        }
+
+        return Bytes.of(boas.toByteArray());
+    }
+
+    /**
+     * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object.
+     *
+     * @param bytes - The bytes that will be deserialized. (not null)
+     * @return The deserialized object.
+     * @throws Exception A problem was encountered while deserializing the 
object.
+     */
+    public VisibilityBindingSet deserialize(final Bytes bytes) throws 
Exception {
+        requireNonNull(bytes);
+
+        try(final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bytes.toArray()))) {
+            final Object o = ois.readObject();
+            if(o instanceof VisibilityBindingSet) {
+                return (VisibilityBindingSet) o;
+            } else {
+                throw new Exception("Deserialized Object is not a 
VisibilityBindingSet. Was: " + o.getClass());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
index a0edbda..02dced7 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -18,27 +18,25 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.export;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Exports a single Binding Set that is a new result for a SPARQL query to some
  * other location.
  */
 @DefaultAnnotation(NonNull.class)
-public interface IncrementalResultExporter {
+public interface IncrementalResultExporter extends AutoCloseable {
 
     /**
-     * Export a Binding Set that is a result of a SPARQL query.
+     * Export a Binding Set that is a result of a SPARQL query that does not 
include a Group By clause.
      *
      * @param tx - The Fluo transaction this export is a part of. (not null)
      * @param queryId - The Fluo ID of the SPARQL query the binding set is a 
result of. (not null)
-     * @param bindingSetString - The binding set as it was represented within 
the
-     *   Fluo application. (not null)
+     * @param bindingSetString - The Binding Set as it was represented within 
the Fluo application. (not null)
      * @throws ResultExportException The result could not be exported.
      */
     public void export(TransactionBase tx, String queryId, 
VisibilityBindingSet result) throws ResultExportException;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
index c40c5da..72ec947 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
@@ -20,9 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -32,17 +36,18 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaResultExporter implements IncrementalResultExporter {
-    private final KafkaProducer<String, VisibilityBindingSet> producer;
     private static final Logger log = 
Logger.getLogger(KafkaResultExporter.class);
 
+    private final KafkaProducer<String, VisibilityBindingSet> producer;
+
     /**
      * Constructs an instance given a Kafka producer.
-     * 
+     *
      * @param producer
      *            for sending result set alerts to a broker. (not null)
      *            Can be created and configured by {@link 
KafkaResultExporterFactory}
      */
-    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> 
producer) {
+    public KafkaResultExporter(final KafkaProducer<String, 
VisibilityBindingSet> producer) {
         super();
         checkNotNull(producer, "Producer is required.");
         this.producer = producer;
@@ -58,18 +63,25 @@ public class KafkaResultExporter implements 
IncrementalResultExporter {
         checkNotNull(result);
         try {
             final String pcjId = fluoTx.gets(queryId, 
FluoQueryColumns.RYA_PCJ_ID);
-            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" 
+ pcjId + " result=" + result;
+            final String msg = "out to kafta topic: queryId=" + queryId + " 
pcjId=" + pcjId + " result=" + result;
             log.trace(msg);
 
-            // Send result on topic
-            ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* 
value= */ result);
-            // Can add a key if you need to:
-            // ProducerRecord(String topic, K key, V value)
-            producer.send(rec);
+            // Send the result to the topic whose name matches the PCJ ID.
+            final ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<>(pcjId, result);
+            final Future<RecordMetadata> future = producer.send(rec);
+
+            // Don't let the export return until the result has been written 
to the topic. Otherwise we may lose results.
+            future.get();
+
             log.debug("producer.send(rec) completed");
 
         } catch (final Throwable e) {
             throw new ResultExportException("A result could not be exported to 
Kafka.", e);
         }
     }
-}
+
+    @Override
+    public void close() throws Exception {
+        producer.close(5, TimeUnit.SECONDS);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
index a4b589f..b8b3c45 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
@@ -19,16 +19,16 @@
 package org.apache.rya.indexing.pcj.fluo.app.export.rya;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 import java.util.Collections;
 
+import org.apache.fluo.api.client.TransactionBase;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
 
 /**
  * Incrementally exports SPARQL query results to Accumulo PCJ tables as they 
are defined by Rya.
@@ -51,10 +51,11 @@ public class RyaResultExporter implements 
IncrementalResultExporter {
             final TransactionBase fluoTx,
             final String queryId,
             final VisibilityBindingSet result) throws ResultExportException {
-        checkNotNull(fluoTx);
-        checkNotNull(queryId);
-        checkNotNull(result);
+        requireNonNull(fluoTx);
+        requireNonNull(queryId);
+        requireNonNull(result);
 
+        // Look up the ID the PCJ represents within the PCJ Storage.
         final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
 
         try {
@@ -63,4 +64,9 @@ public class RyaResultExporter implements 
IncrementalResultExporter {
             throw new ResultExportException("A result could not be exported to 
Rya.", e);
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        pcjStorage.close();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
new file mode 100644
index 0000000..1cb1594
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import 
org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
+import 
org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationStateSerDe;
+import 
org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSerializationAggregationStateSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Notified when the results of an Aggregation have been updated to include a 
new
+ * {@link BindingSet} value. This observer updates its parent if the new 
Binding Set
+ * effects the parent's results.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationObserver extends BindingSetUpdater {
+
+    private static final AggregationStateSerDe STATE_SERDE = new 
ObjectSerializationAggregationStateSerDe();
+
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.AGGREGATION_BINDING_SET, 
NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final Bytes 
row) {
+        requireNonNull(tx);
+        requireNonNull(row);
+
+        // Fetch the Aggregation node's metadata.
+        final String nodeId = BindingSetRow.make(row).getNodeId();
+        final AggregationMetadata metadata = 
queryDao.readAggregationMetadata(tx, nodeId);
+
+        // Read the Visibility Binding Set from the value.
+        final Bytes stateBytes = tx.get(row, 
FluoQueryColumns.AGGREGATION_BINDING_SET);
+        final AggregationState state = STATE_SERDE.deserialize( 
stateBytes.toArray() );
+        final VisibilityBindingSet aggBindingSet = new 
VisibilityBindingSet(state.getBindingSet(), state.getVisibility());
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = metadata.getParentNodeId();
+
+        return new Observation(nodeId, aggBindingSet, parentNodeId);
+    }
+}

Reply via email to