http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java new file mode 100644 index 0000000..2e41cb1 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java @@ -0,0 +1,572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.DecimalLiteralImpl; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the results of an Aggregate node when its child has added a new Binding Set to its results. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationResultUpdater { + private static final Logger log = Logger.getLogger(AggregationResultUpdater.class); + + private static final AggregationStateSerDe AGG_STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); + + private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS; + static { + final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder(); + builder.put(AggregationType.COUNT, new CountFunction()); + builder.put(AggregationType.SUM, new SumFunction()); + builder.put(AggregationType.AVERAGE, new AverageFunction()); + builder.put(AggregationType.MIN, new MinFunction()); + builder.put(AggregationType.MAX, new MaxFunction()); + FUNCTIONS = builder.build(); + } + + /** + * Updates the results of an Aggregation node where its child has emitted a new Binding Set. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @param childBindingSet - The Binding Set that was omitted by the Aggregation Node's child. (not null) + * @param aggregationMetadata - The metadata of the Aggregation node whose results will be updated. (not null) + * @throws Exception The update could not be successfully performed. + */ + public void updateAggregateResults( + final TransactionBase tx, + final VisibilityBindingSet childBindingSet, + final AggregationMetadata aggregationMetadata) throws Exception { + requireNonNull(tx); + requireNonNull(childBindingSet); + requireNonNull(aggregationMetadata); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Child Binding Set:\n" + childBindingSet + "\n"); + + // The Row ID for the Aggregation State that needs to be updated is defined by the Group By variables. + final String aggregationNodeId = aggregationMetadata.getNodeId(); + final VariableOrder groupByVars = aggregationMetadata.getGroupByVariableOrder(); + final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, groupByVars, childBindingSet); + + // Load the old state from the bytes if one was found; otherwise initialize the state. + final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET) ); + + final AggregationState state; + if(stateBytes.isPresent()) { + // Deserialize the old state + final byte[] bytes = stateBytes.get().toArray(); + state = AGG_STATE_SERDE.deserialize(bytes); + } else { + // Initialize a new state. + state = new AggregationState(); + + // If we have group by bindings, their values need to be added to the state's binding set. + final MapBindingSet bindingSet = state.getBindingSet(); + for(final String variable : aggregationMetadata.getGroupByVariableOrder()) { + bindingSet.addBinding( childBindingSet.getBinding(variable) ); + } + } + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Before Update: " + state.getBindingSet().toString() + "\n"); + + // Update the visibilities of the result binding set based on the child's visibilities. + final String oldVisibility = state.getVisibility(); + final String updateVisibilities = VisibilitySimplifier.unionAndSimplify(oldVisibility, childBindingSet.getVisibility()); + state.setVisibility(updateVisibilities); + + // Update the Aggregation State with each Aggregation function included within this group. + for(final AggregationElement aggregation : aggregationMetadata.getAggregations()) { + final AggregationType type = aggregation.getAggregationType(); + final AggregationFunction function = FUNCTIONS.get(type); + if(function == null) { + throw new RuntimeException("Unrecognized aggregation function: " + type); + } + + function.update(aggregation, state, childBindingSet); + } + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "After Update:" + state.getBindingSet().toString() + "\n" ); + + // Store the updated state. This will write on top of any old state that was present for the Group By values. + tx.set(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET, Bytes.of(AGG_STATE_SERDE.serialize(state))); + } + + /** + * A function that updates an {@link AggregationState}. + */ + public static interface AggregationFunction { + + /** + * Updates an {@link AggregationState} based on the values of a child Binding Set. + * + * @param aggregation - Defines which function needs to be performed as well as any details required + * to do the aggregation work. (not null) + * @param state - The state that will be updated. (not null) + * @param childBindingSet - The Binding Set whose values will be used to update the state. + */ + public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet); + } + + /** + * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name + * that is being counted by the {@link AggregationElement}. + */ + public static final class CountFunction implements AggregationFunction { + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); + + // Only add one to the count if the child contains the binding that we are counting. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + if(newBinding) { + // Initialize the binding. + result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE)); + } else { + // Update the existing binding. + final Literal count = (Literal) result.getValue(resultName); + final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE ); + result.addBinding(resultName, new IntegerLiteralImpl(updatedCount)); + } + } + } + } + + /** + * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name + * that is being summed by the {@link AggregationElement}. + */ + public static final class SumFunction implements AggregationFunction { + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements."); + + // Only add values to the sum if the child contains the binding that we are summing. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + // Get the starting number for the sum. + Literal sum; + if(newBinding) { + sum = new IntegerLiteralImpl(BigInteger.ZERO); + } else { + sum = (Literal) state.getBindingSet().getValue(resultName); + } + + // Add the child binding set's value if it is a numeric literal. + final Value childValue = childBindingSet.getValue(aggregatedName); + if(childValue instanceof Literal) { + final Literal childLiteral = (Literal) childValue; + if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { + try { + sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS); + } catch (final ValueExprEvaluationException e) { + log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet); + return; + } + } + } + + // Update the state to include the new sum. + result.addBinding(resultName, sum); + } + } + } + + /** + * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name + * that is being averaged by the {@link AggregationElement}. + */ + public static final class AverageFunction implements AggregationFunction { + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements."); + + // Only update the average if the child contains the binding that we are averaging. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + // Get the state of the average. + final Map<String, AverageState> averageStates = state.getAverageStates(); + AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName); + + // Update the state of the average. + final Value childValue = childBindingSet.getValue(aggregatedName); + if(childValue instanceof Literal) { + final Literal childLiteral = (Literal) childValue; + if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { + try { + // Update the sum. + final Literal oldSum = new DecimalLiteralImpl(averageState.getSum()); + final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue(); + + // Update the count. + final BigInteger count = averageState.getCount().add( BigInteger.ONE ); + + // Update the BindingSet to include the new average. + final Literal sumLiteral = new DecimalLiteralImpl(sum); + final Literal countLiteral = new IntegerLiteralImpl(count); + final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE); + result.addBinding(resultName, average); + + // Update the average state that is stored. + averageState = new AverageState(sum, count); + averageStates.put(resultName, averageState); + } catch (final ValueExprEvaluationException e) { + log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet); + return; + } + } + } + } + } + } + + /** + * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being + * maxed by the {@link AggregationElement}. + */ + public static final class MaxFunction implements AggregationFunction { + + private final ValueComparator compare = new ValueComparator(); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements."); + + // Only update the max if the child contains the binding that we are finding the max value for. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + Value max; + if(newBinding) { + max = childBindingSet.getValue(aggregatedName); + } else { + final Value oldMax = result.getValue(resultName); + final Value childMax = childBindingSet.getValue(aggregatedName); + max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax; + } + + result.addBinding(resultName, max); + } + } + } + + /** + * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being + * mined by the {@link AggregationElement}. + */ + public static final class MinFunction implements AggregationFunction { + + private final ValueComparator compare = new ValueComparator(); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements."); + + // Only update the min if the child contains the binding that we are finding the min value for. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + Value min; + if(newBinding) { + min = childBindingSet.getValue(aggregatedName); + } else { + final Value oldMin = result.getValue(resultName); + final Value chidlMin = childBindingSet.getValue(aggregatedName); + min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin; + } + + result.addBinding(resultName, min); + } + } + } + + /** + * Reads/Writes instances of {@link AggregationState} to/from bytes. + */ + public static interface AggregationStateSerDe { + + /** + * @param state - The state that will be serialized. (not null) + * @return The state serialized to a byte[]. + */ + public byte[] serialize(AggregationState state); + + /** + * @param bytes - The bytes that will be deserialized. (not null) + * @return The {@link AggregationState} that was read from the bytes. + */ + public AggregationState deserialize(byte[] bytes); + } + + /** + * An implementation of {@link AggregationStateSerDe} that uses Java Serialization. + */ + public static final class ObjectSerializationAggregationStateSerDe implements AggregationStateSerDe { + + @Override + public byte[] serialize(final AggregationState state) { + requireNonNull(state); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try(final ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(state); + } catch (final IOException e) { + throw new RuntimeException("A problem was encountered while serializing an AggregationState object.", e); + } + + return baos.toByteArray(); + } + + @Override + public AggregationState deserialize(final byte[] bytes) { + requireNonNull(bytes); + + final AggregationState state; + + final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + try(ObjectInputStream ois = new ObjectInputStream(bais)) { + final Object o = ois.readObject(); + if(o instanceof AggregationState) { + state = (AggregationState)o; + } else { + throw new RuntimeException("A problem was encountered while deserializing an AggregationState object. Wrong class."); + } + } catch (final IOException | ClassNotFoundException e) { + throw new RuntimeException("A problem was encountered while deserializing an AggregationState object.", e); + } + + return state; + } + } + + /** + * Keeps track information required to update and build the resulting Binding Set for a set of Group By values. + */ + public static final class AggregationState implements Serializable { + private static final long serialVersionUID = 1L; + + // The visibility equation that encompasses all data the aggregation state is derived from. + private String visibility; + + // A binding set that holds the current state of the aggregations. + private final MapBindingSet bindingSet; + + // A map from result binding name to the state that derived that binding's value. + private final Map<String, AverageState> avgStates; + + /** + * Constructs an instance of {@link AggregationState}. + */ + public AggregationState() { + this.visibility = ""; + this.bindingSet = new MapBindingSet(); + this.avgStates = new HashMap<>(); + } + + /** + * Constructs an instance of {@link AggregationState}. + * + * @param visibility - The visibility equation associated with the resulting binding set. (not null) + * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of + * Group By values. (not null) + * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to + * average state for that binding. + */ + public AggregationState( + final String visibility, + final MapBindingSet bindingSet, + final Map<String, AverageState> avgStates) { + this.visibility = requireNonNull(visibility); + this.bindingSet = requireNonNull(bindingSet); + this.avgStates = requireNonNull(avgStates); + } + + /** + * @return The visibility equation associated with the resulting binding set. + */ + public String getVisibility() { + return visibility; + } + + /** + * @param visibility - The visibility equation associated with the resulting binding set. + */ + public void setVisibility(final String visibility) { + this.visibility = requireNonNull(visibility); + } + + /** + * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values. + */ + public MapBindingSet getBindingSet() { + return bindingSet; + } + + /** + * @return If the aggregation is doing an Average, this is a map from result binding name to + * average state for that binding. + */ + public Map<String, AverageState> getAverageStates() { + return avgStates; + } + + @Override + public int hashCode() { + return Objects.hash(visibility, bindingSet, avgStates); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof AggregationState) { + final AggregationState state = (AggregationState) o; + return Objects.equals(visibility, state.visibility) && + Objects.equals(bindingSet, state.bindingSet) && + Objects.equals(avgStates, state.avgStates); + } + return false; + } + } + + /** + * The Sum and Count of the values that are being averaged. The average itself is derived from these values. + */ + public static class AverageState implements Serializable { + private static final long serialVersionUID = 1L; + + private final BigDecimal sum; + private final BigInteger count; + + /** + * Constructs an instance of {@link AverageState} where the count and sum start at 0. + */ + public AverageState() { + sum = BigDecimal.ZERO; + count = BigInteger.ZERO; + } + + /** + * Constructs an instance of {@link AverageState}. + * + * @param sum - The sum of the values that are averaged. (not null) + * @param count - The number of values that are averaged. (not null) + */ + public AverageState(final BigDecimal sum, final BigInteger count) { + this.sum = requireNonNull(sum); + this.count = requireNonNull(count); + } + + /** + * @return The sum of the values that are averaged. + */ + public BigDecimal getSum() { + return sum; + } + + /** + * @return The number of values that are averaged. + */ + public BigInteger getCount() { + return count; + } + + @Override + public int hashCode() { + return Objects.hash(sum, count); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof AverageState) { + final AverageState state = (AverageState) o; + return Objects.equals(sum, state.sum) && + Objects.equals(count, state.count); + } + return false; + } + + @Override + public String toString() { + return "Sum: " + sum + " Count: " + count; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java index 8c8505d..2e45ea6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java @@ -21,12 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import org.apache.fluo.api.data.Bytes; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.Immutable; -import org.apache.fluo.api.data.Bytes; - /** * The values of an Accumulo Row ID for a row that stores a Binding set for * a specific Node ID of a query. @@ -73,13 +73,8 @@ public class BindingSetRow { // Read the Node ID from the row's bytes. final String[] rowArray = row.toString().split(NODEID_BS_DELIM); - if(rowArray.length != 2) { - throw new IllegalArgumentException("A row must contain a single NODEID_BS_DELIM."); - } - final String nodeId = rowArray[0]; - String bindingSetString = rowArray[1]; - + final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 3b17a33..42ec686 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -19,24 +19,22 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.Filter; @@ -45,14 +43,12 @@ import org.openrdf.query.algebra.evaluation.TripleSource; import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil; -import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Optional; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; import info.aduna.iteration.CloseableIteration; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; /** * Updates the results of a Filter node when its child has added a new Binding @@ -61,8 +57,9 @@ import org.apache.fluo.api.data.Column; @DefaultAnnotation(NonNull.class) public class FilterResultUpdater { - private static final BindingSetStringConverter ID_CONVERTER = new BindingSetStringConverter(); - private static final VisibilityBindingSetStringConverter VALUE_CONVERTER = new VisibilityBindingSetStringConverter(); + private static final Logger log = Logger.getLogger(FilterResultUpdater.class); + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); /** * A utility class used to search SPARQL queries for Filters. @@ -96,7 +93,7 @@ public class FilterResultUpdater { * new Binding Set to its results. * * @param tx - The transaction all Fluo queries will use. (not null) - * @param childBindingSet - A binding set that the query's child node has emmitted. (not null) + * @param childBindingSet - A binding set that the query's child node has emitted. (not null) * @param filterMetadata - The metadata of the Filter whose results will be updated. (not null) * @throws Exception Something caused the update to fail. */ @@ -108,6 +105,11 @@ public class FilterResultUpdater { checkNotNull(childBindingSet); checkNotNull(filterMetadata); + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Filter Node ID: " + filterMetadata.getNodeId() + "\n" + + "Binding Set:\n" + childBindingSet + "\n"); + // Parse the original query and find the Filter that represents filterId. final String sparql = filterMetadata.getOriginalSparql(); final int indexWithinQuery = filterMetadata.getFilterIndexWithinSparql(); @@ -118,23 +120,22 @@ public class FilterResultUpdater { if (isTrue(condition, childBindingSet)) { // Create the Filter's binding set from the child's. final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); + final BindingSet filterBindingSet = BindingSetUtil.keepBindings(filterVarOrder, childBindingSet); - final MapBindingSet filterBindingSet = new MapBindingSet(); - for(final String bindingName : filterVarOrder) { - if(childBindingSet.hasBinding(bindingName)) { - final Binding binding = childBindingSet.getBinding(bindingName); - filterBindingSet.addBinding(binding); - } - } + // Create the Row Key for the emitted binding set. It does not contain visibilities. + final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, filterBindingSet); + + // If this is a new binding set, then emit it. + if(tx.get(resultRow, FluoQueryColumns.FILTER_BINDING_SET) == null) { + final VisibilityBindingSet visBindingSet = new VisibilityBindingSet(filterBindingSet, childBindingSet.getVisibility()); + final Bytes nodeValueBytes = BS_SERDE.serialize(visBindingSet); - final String filterBindingSetIdString = ID_CONVERTER.convert(filterBindingSet, filterVarOrder); - String filterBindingSetValueString = ""; - filterBindingSetValueString = VALUE_CONVERTER.convert(childBindingSet, filterVarOrder); + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "New Binding Set: " + visBindingSet + "\n"); - final String row = filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetIdString; - final Column col = FluoQueryColumns.FILTER_BINDING_SET; - final String value = filterBindingSetValueString; - tx.set(row, col, value); + tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes); + } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java index b78562c..602fd9d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java @@ -25,6 +25,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.UR import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.data.Bytes; @@ -53,7 +54,7 @@ public class IncUpdateDAO { return rs; } - private static String getTripleString(final RyaStatement rs) { + public static String getTripleString(final RyaStatement rs) { final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE; final String pred = rs.getPredicate().getData() + TYPE_DELIM + URI_TYPE; final String objData = rs.getObject().getData(); @@ -102,81 +103,41 @@ public class IncUpdateDAO { */ public static void printTriples(final FluoClient fluoClient) throws Exception { try (Snapshot snapshot = fluoClient.newSnapshot()) { - CellScanner cscanner = snapshot.scanner().fetch(new Column("triples", "SPO")).build(); - for (RowColumnValue rcv : cscanner) { + final CellScanner cscanner = snapshot.scanner().fetch(new Column("triples", "SPO")).build(); + for (final RowColumnValue rcv : cscanner) { System.out.println("Triple: "+rcv.getsRow()); } } } -// /** -// * Print all bindings for the given queries. For demo's and diagnostics. -// * @param fluoClient -// * @param queryNames -// * @throws Exception -// */ -// public static void printQueryResults(final FluoClient fluoClient, -// final Map<String, String> queryNames) throws Exception { -// try (Snapshot snapshot = fluoClient.newSnapshot(); -// TypedTransaction tx1 = stl.wrap(fluoClient.newTransaction())) { -// -// final ScannerConfiguration scanConfig = new ScannerConfiguration(); -// scanConfig.fetchColumn(Bytes.of("query"), Bytes.of("bindingSet")); -// -// final RowIterator rowIter = snapshot.get(scanConfig); -// String sparqlRow = ""; -// System.out.println("*********************************************************"); -// -// while (rowIter.hasNext()) { -// final Entry<Bytes, ColumnIterator> row = rowIter.next(); -// final String[] joinInfo = row.getKey().toString() -// .split(NODEID_BS_DELIM); -// final String sparql = joinInfo[0]; -// final String bs = joinInfo[1]; -// if (!sparqlRow.equals(sparql)) { -// sparqlRow = sparql; -// System.out.println(); -// System.out.println(); -// System.out.println(queryNames.get(sparqlRow) -// + " has bindings: "); -// System.out.println(); -// } -// -// final String variables = tx1.get().row(sparqlRow).col(NODE_VARS).toString(); -// final String[] vars = variables.split(";"); -// final String[] bsVals = bs.split(DELIM); -// System.out.print("Bindingset: "); -// for (int i = 0; i < vars.length; i++) { -// System.out.print(vars[i] + " = " + bsVals[i] + " "); -// } -// System.out.println(); -// -// } -// -// System.out.println("*********************************************************"); -// } -// } + /** + * Print all rows in the Fluo table for diagnostics. + * </p> + * Consider using {@code FluoITHelper.printFluoTable(FluoClient client)} instead. + */ + @Deprecated + public static void printAll(final SnapshotBase sx) { + final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n"; + System.out.println("Printing all tables. Showing unprintable bytes and braces as {ff} and {{} and {}} where ff is the value in hexadecimal."); + System.out.format(FORMAT, "--Row--", "--Column Family--", "--Column Qual--", "--Value--"); + final CellScanner cscanner = sx.scanner().build(); + for (final RowColumnValue rcv : cscanner) { + System.out.format(FORMAT, to_String(rcv.getRow()), + to_String(rcv.getColumn().getFamily()), + to_String(rcv.getColumn().getQualifier()), + to_String(rcv.getValue())); + } + } /** * Print all rows in the Fluo table for diagnostics. - * @param fluoClient - * @throws Exception + * </p> + * Consider using {@code FluoITHelper.printFluoTable(FluoClient client)} instead. */ + @Deprecated public static void printAll(final FluoClient fluoClient) throws Exception { - final String FORMAT = "%-30s | %-10s | %-10s | %-40s\n"; - System.out - .println("Printing all tables. Showing unprintable bytes and braces as {ff} and {{} and {}} where ff is the value in hexadecimal."); - System.out.format(FORMAT, "--Row--", "--Column Family--", - "--Column Qual--", "--Value--"); - // Use try with resource to ensure snapshot is closed. - try (Snapshot snapshot = fluoClient.newSnapshot()) { - CellScanner cscanner = snapshot.scanner().build(); - for (RowColumnValue rcv : cscanner) { - System.out.format(FORMAT, to_String(rcv.getRow()), - to_String(rcv.getColumn().getFamily()), - to_String(rcv.getColumn().getQualifier()), - to_String(rcv.getValue())); - } + try(Snapshot sx = fluoClient.newSnapshot()) { + printAll(sx); } } @@ -208,4 +169,4 @@ public class IncUpdateDAO { } return sb.toString(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index 84581ef..be4df71 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -31,6 +31,7 @@ public class IncrementalUpdateConstants { public static final String SP_PREFIX = "STATEMENT_PATTERN"; public static final String JOIN_PREFIX = "JOIN"; public static final String FILTER_PREFIX = "FILTER"; + public static final String AGGREGATION_PREFIX = "AGGREGATION"; public static final String QUERY_PREFIX = "QUERY"; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index 39dcc16..2cb5a54 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -27,14 +27,20 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; @@ -42,17 +48,12 @@ import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Updates the results of a Join node when one of its children has added a @@ -61,49 +62,57 @@ import org.apache.fluo.api.data.Span; @DefaultAnnotation(NonNull.class) public class JoinResultUpdater { - private static final BindingSetStringConverter idConverter = new BindingSetStringConverter(); - private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); + private static final Logger log = Logger.getLogger(JoinResultUpdater.class); + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - + /** * Updates the results of a Join node when one of its children has added a * new Binding Set to its results. * * @param tx - The transaction all Fluo queries will use. (not null) - * @param childId - The Node ID of the child whose results received a new Binding Set. (not null) + * @param childNodeId - The Node ID of the child whose results received a new Binding Set. (not null) * @param childBindingSet - The Binding Set that was just emitted by child node. (not null) * @param joinMetadata - The metadata for the Join that has been notified. (not null) - * @throws BindingSetConversionException + * @throws Exception The update could not be successfully performed. */ public void updateJoinResults( final TransactionBase tx, - final String childId, + final String childNodeId, final VisibilityBindingSet childBindingSet, - final JoinMetadata joinMetadata) throws BindingSetConversionException { + final JoinMetadata joinMetadata) throws Exception { checkNotNull(tx); - checkNotNull(childId); + checkNotNull(childNodeId); checkNotNull(childBindingSet); checkNotNull(joinMetadata); + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Join Node ID: " + joinMetadata.getNodeId() + "\n" + + "Child Node ID: " + childNodeId + "\n" + + "Child Binding Set:\n" + childBindingSet + "\n"); + // Figure out which join algorithm we are going to use. final IterativeJoin joinAlgorithm; switch(joinMetadata.getJoinType()) { - case NATURAL_JOIN: - joinAlgorithm = new NaturalJoin(); - break; - case LEFT_OUTER_JOIN: - joinAlgorithm = new LeftOuterJoin(); - break; - default: - throw new RuntimeException("Unsupported JoinType: " + joinMetadata.getJoinType()); + case NATURAL_JOIN: + joinAlgorithm = new NaturalJoin(); + break; + case LEFT_OUTER_JOIN: + joinAlgorithm = new LeftOuterJoin(); + break; + default: + throw new RuntimeException("Unsupported JoinType: " + joinMetadata.getJoinType()); } // Figure out which side of the join the new binding set appeared on. final Side emittingSide; final String siblingId; - if(childId.equals(joinMetadata.getLeftChildNodeId())) { + if(childNodeId.equals(joinMetadata.getLeftChildNodeId())) { emittingSide = Side.LEFT; siblingId = joinMetadata.getRightChildNodeId(); } else { @@ -112,7 +121,7 @@ public class JoinResultUpdater { } // Iterates over the sibling node's BindingSets that join with the new binding set. - final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx); + final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childNodeId, childBindingSet, siblingId, tx); // Iterates over the resulting BindingSets from the join. final Iterator<VisibilityBindingSet> newJoinResults; @@ -125,14 +134,22 @@ public class JoinResultUpdater { // Insert the new join binding sets to the Fluo table. final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); while(newJoinResults.hasNext()) { - final BindingSet newJoinResult = newJoinResults.next(); - final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder); - final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder); - - final String row = joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId; - final Column col = FluoQueryColumns.JOIN_BINDING_SET; - final String value = joinBindingSetStringValue; - tx.set(row, col, value); + final VisibilityBindingSet newJoinResult = newJoinResults.next(); + + // Create the Row Key for the emitted binding set. It does not contain visibilities. + final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult); + + // Only insert the join Binding Set if it is new. + if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null) { + // Create the Node Value. It does contain visibilities. + final Bytes nodeValueBytes = BS_SERDE.serialize(newJoinResult); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "New Join Result:\n" + newJoinResult + "\n"); + + tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes); + } } } @@ -150,8 +167,8 @@ public class JoinResultUpdater { final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); // Get the Binding strings - final String childBindingSetString = valueConverter.convert(childBindingSet, childVarOrder); - String[] childBindingArray = childBindingSetString.split("\u0001"); + final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); + final String[] childBindingArray = childBindingSetString.split("\u0001"); final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); // Create the prefix that will be used to scan for binding sets of the sibling node. @@ -174,7 +191,7 @@ public class JoinResultUpdater { // earlier iterations of this algorithm. final RowScanner rs = tx.scanner().over(Span.prefix(siblingScanPrefix)).fetch(getScanColumnFamily(siblingId)).byRow().build(); - return new FluoTableIterator(rs, siblingVarOrder); + return new FluoTableIterator(rs); } @@ -270,7 +287,7 @@ public class JoinResultUpdater { default: throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter."); } - + return column; } @@ -426,14 +443,8 @@ public class JoinResultUpdater { leftVisi = joinResult.getVisibility(); rightVisi = newResult.getVisibility(); } + final String visibility = VisibilitySimplifier.unionAndSimplify(leftVisi, rightVisi); - String visibility = ""; - final Joiner join = Joiner.on(")&("); - if(leftVisi.isEmpty() || rightVisi.isEmpty()) { - visibility = (leftVisi + rightVisi).trim(); - } else { - visibility = "(" + join.join(leftVisi, rightVisi) + ")"; - } return new VisibilityBindingSet(bs, visibility); } @@ -449,24 +460,22 @@ public class JoinResultUpdater { */ private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> { + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet( FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, FluoQueryColumns.JOIN_BINDING_SET, FluoQueryColumns.FILTER_BINDING_SET); private final Iterator<ColumnScanner> rows; - private final VariableOrder varOrder; /** * Constructs an instance of {@link FluoTableIterator}. * * @param rows - Iterates over RowId values in a Fluo Table. (not null) - * @param varOrder - The Variable Order of binding sets that will be - * read from the Fluo Table. (not null) */ - public FluoTableIterator(final RowScanner rows, final VariableOrder varOrder) { + public FluoTableIterator(final RowScanner rows) { this.rows = checkNotNull(rows).iterator(); - this.varOrder = checkNotNull(varOrder); } @Override @@ -478,12 +487,16 @@ public class JoinResultUpdater { public VisibilityBindingSet next() { final ColumnScanner columns = rows.next(); - for (ColumnValue cv : columns) { - if(BINDING_SET_COLUMNS.contains(cv.getColumn())) { - final String bindingSetString = cv.getsValue(); - return (VisibilityBindingSet) valueConverter.convert(bindingSetString, varOrder); - } - } + for (final ColumnValue cv : columns) { + if(BINDING_SET_COLUMNS.contains(cv.getColumn())) { + final Bytes value = cv.getValue(); + try { + return BS_SERDE.deserialize(value); + } catch (final Exception e) { + throw new RuntimeException("Row did not containing a Binding Set.", e); + } + } + } throw new RuntimeException("Row did not containing a Binding Set."); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java index 0a5ecc1..5365e30 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app; import static java.util.Objects.requireNonNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; @@ -26,14 +27,13 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP import java.util.List; +import org.apache.fluo.api.data.Column; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns; import org.openrdf.query.BindingSet; import com.google.common.base.Optional; -import org.apache.fluo.api.data.Column; - /** * Represents the different types of nodes that a Query may have. */ @@ -41,7 +41,8 @@ public enum NodeType { FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET), JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), - QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET); + QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET), + AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET); //Metadata Columns associated with given NodeType private QueryNodeMetadataColumns metadataColumns; @@ -55,7 +56,7 @@ public enum NodeType { * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null) * @param bindingSetColumn - The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. (not null) */ - private NodeType(QueryNodeMetadataColumns metadataColumns, Column bindingSetColumn) { + private NodeType(final QueryNodeMetadataColumns metadataColumns, final Column bindingSetColumn) { this.metadataColumns = requireNonNull(metadataColumns); this.bindingSetColumn = requireNonNull(bindingSetColumn); } @@ -95,6 +96,8 @@ public enum NodeType { type = JOIN; } else if(nodeId.startsWith(QUERY_PREFIX)) { type = QUERY; + } else if(nodeId.startsWith(AGGREGATION_PREFIX)) { + type = AGGREGATION; } return Optional.fromNullable(type); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index 9cd2bd7..ba82726 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -19,22 +19,22 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; -import org.openrdf.query.Binding; -import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.query.BindingSet; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Column; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Updates the results of a Query node when one of its children has added a @@ -42,9 +42,10 @@ import org.apache.fluo.api.data.Column; */ @DefaultAnnotation(NonNull.class) public class QueryResultUpdater { - - private final BindingSetStringConverter converter = new BindingSetStringConverter(); - private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); + private static final Logger log = Logger.getLogger(QueryResultUpdater.class); + + private static final FluoQueryMetadataDAO METADATA_DA0 = new FluoQueryMetadataDAO(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); /** * Updates the results of a Query node when one of its children has added a @@ -53,32 +54,46 @@ public class QueryResultUpdater { * @param tx - The transaction all Fluo queries will use. (not null) * @param childBindingSet - A binding set that the query's child node has emmitted. (not null) * @param queryMetadata - The metadata of the Query whose results will be updated. (not null) + * @throws Exception A problem caused the update to fail. */ public void updateQueryResults( final TransactionBase tx, final VisibilityBindingSet childBindingSet, - final QueryMetadata queryMetadata) { + final QueryMetadata queryMetadata) throws Exception { checkNotNull(tx); checkNotNull(childBindingSet); checkNotNull(queryMetadata); + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Join Node ID: " + queryMetadata.getNodeId() + "\n" + + "Child Node ID: " + queryMetadata.getChildNodeId() + "\n" + + "Child Binding Set:\n" + childBindingSet + "\n"); + // Create the query's Binding Set from the child node's binding set. final VariableOrder queryVarOrder = queryMetadata.getVariableOrder(); + final BindingSet queryBindingSet = BindingSetUtil.keepBindings(queryVarOrder, childBindingSet); - final MapBindingSet queryBindingSet = new MapBindingSet(); - for(final String bindingName : queryVarOrder) { - if(childBindingSet.hasBinding(bindingName)) { - final Binding binding = childBindingSet.getBinding(bindingName); - queryBindingSet.addBinding(binding); - } + // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables. + final Bytes resultRow; + + final String childNodeId = queryMetadata.getChildNodeId(); + final boolean isGrouped = childNodeId.startsWith( IncrementalUpdateConstants.AGGREGATION_PREFIX ); + if(isGrouped) { + final AggregationMetadata aggMetadata = METADATA_DA0.readAggregationMetadata(tx, childNodeId); + final VariableOrder groupByVars = aggMetadata.getGroupByVariableOrder(); + resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), groupByVars, queryBindingSet); + } else { + resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, queryBindingSet); } - final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder); - final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder); - // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry. - final String row = queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString; - final Column col = FluoQueryColumns.QUERY_BINDING_SET; - final String value = queryBindingSetValueString; - tx.set(row, col, value); + // Create the Binding Set that goes in the Node Value. It does contain visibilities. + final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "New Binding Set: " + childBindingSet + "\n"); + + tx.set(resultRow, FluoQueryColumns.QUERY_BINDING_SET, nodeValueBytes); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java new file mode 100644 index 0000000..34439e4 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects. + */ +@DefaultAnnotation(NonNull.class) +public class VisibilityBindingSetSerDe { + + /** + * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object. + * + * @param bindingSet - The binding set that will be serialized. (not null) + * @return The serialized object. + * @throws Exception A problem was encountered while serializing the object. + */ + public Bytes serialize(final VisibilityBindingSet bindingSet) throws Exception { + requireNonNull(bindingSet); + + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) { + oos.writeObject(bindingSet); + } + + return Bytes.of(boas.toByteArray()); + } + + /** + * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object. + * + * @param bytes - The bytes that will be deserialized. (not null) + * @return The deserialized object. + * @throws Exception A problem was encountered while deserializing the object. + */ + public VisibilityBindingSet deserialize(final Bytes bytes) throws Exception { + requireNonNull(bytes); + + try(final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toArray()))) { + final Object o = ois.readObject(); + if(o instanceof VisibilityBindingSet) { + return (VisibilityBindingSet) o; + } else { + throw new Exception("Deserialized Object is not a VisibilityBindingSet. Was: " + o.getClass()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java index a0edbda..02dced7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java @@ -18,27 +18,25 @@ */ package org.apache.rya.indexing.pcj.fluo.app.export; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.fluo.api.client.TransactionBase; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Exports a single Binding Set that is a new result for a SPARQL query to some * other location. */ @DefaultAnnotation(NonNull.class) -public interface IncrementalResultExporter { +public interface IncrementalResultExporter extends AutoCloseable { /** - * Export a Binding Set that is a result of a SPARQL query. + * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause. * * @param tx - The Fluo transaction this export is a part of. (not null) * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null) - * @param bindingSetString - The binding set as it was represented within the - * Fluo application. (not null) + * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null) * @throws ResultExportException The result could not be exported. */ public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java index c40c5da..72ec947 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java @@ -20,9 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.fluo.api.client.TransactionBase; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; @@ -32,17 +36,18 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaResultExporter implements IncrementalResultExporter { - private final KafkaProducer<String, VisibilityBindingSet> producer; private static final Logger log = Logger.getLogger(KafkaResultExporter.class); + private final KafkaProducer<String, VisibilityBindingSet> producer; + /** * Constructs an instance given a Kafka producer. - * + * * @param producer * for sending result set alerts to a broker. (not null) * Can be created and configured by {@link KafkaResultExporterFactory} */ - public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) { + public KafkaResultExporter(final KafkaProducer<String, VisibilityBindingSet> producer) { super(); checkNotNull(producer, "Producer is required."); this.producer = producer; @@ -58,18 +63,25 @@ public class KafkaResultExporter implements IncrementalResultExporter { checkNotNull(result); try { final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; log.trace(msg); - // Send result on topic - ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result); - // Can add a key if you need to: - // ProducerRecord(String topic, K key, V value) - producer.send(rec); + // Send the result to the topic whose name matches the PCJ ID. + final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result); + final Future<RecordMetadata> future = producer.send(rec); + + // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. + future.get(); + log.debug("producer.send(rec) completed"); } catch (final Throwable e) { throw new ResultExportException("A result could not be exported to Kafka.", e); } } -} + + @Override + public void close() throws Exception { + producer.close(5, TimeUnit.SECONDS); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java index a4b589f..b8b3c45 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java @@ -19,16 +19,16 @@ package org.apache.rya.indexing.pcj.fluo.app.export.rya; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import java.util.Collections; +import org.apache.fluo.api.client.TransactionBase; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; /** * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. @@ -51,10 +51,11 @@ public class RyaResultExporter implements IncrementalResultExporter { final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { - checkNotNull(fluoTx); - checkNotNull(queryId); - checkNotNull(result); + requireNonNull(fluoTx); + requireNonNull(queryId); + requireNonNull(result); + // Look up the ID the PCJ represents within the PCJ Storage. final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); try { @@ -63,4 +64,9 @@ public class RyaResultExporter implements IncrementalResultExporter { throw new ResultExportException("A result could not be exported to Rya.", e); } } + + @Override + public void close() throws Exception { + pcjStorage.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java new file mode 100644 index 0000000..1cb1594 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState; +import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationStateSerDe; +import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSerializationAggregationStateSerDe; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.BindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Notified when the results of an Aggregation have been updated to include a new + * {@link BindingSet} value. This observer updates its parent if the new Binding Set + * effects the parent's results. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationObserver extends BindingSetUpdater { + + private static final AggregationStateSerDe STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); + + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.AGGREGATION_BINDING_SET, NotificationType.STRONG); + } + + @Override + public Observation parseObservation(final TransactionBase tx, final Bytes row) { + requireNonNull(tx); + requireNonNull(row); + + // Fetch the Aggregation node's metadata. + final String nodeId = BindingSetRow.make(row).getNodeId(); + final AggregationMetadata metadata = queryDao.readAggregationMetadata(tx, nodeId); + + // Read the Visibility Binding Set from the value. + final Bytes stateBytes = tx.get(row, FluoQueryColumns.AGGREGATION_BINDING_SET); + final AggregationState state = STATE_SERDE.deserialize( stateBytes.toArray() ); + final VisibilityBindingSet aggBindingSet = new VisibilityBindingSet(state.getBindingSet(), state.getVisibility()); + + // Figure out which node needs to handle the new metadata. + final String parentNodeId = metadata.getParentNodeId(); + + return new Observation(nodeId, aggBindingSet, parentNodeId); + } +}
