http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index bc558a5..700d0fb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -37,6 +37,7 @@ import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -50,10 +51,11 @@ import io.fluo.api.iterator.ColumnIterator; import io.fluo.api.iterator.RowIterator; import io.fluo.api.types.Encoder; import io.fluo.api.types.StringEncoder; -import mvm.rya.indexing.external.tupleSet.BindingSetConverter; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Updates the results of a Join node when one of its children has added a @@ -62,7 +64,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; @ParametersAreNonnullByDefault public class JoinResultUpdater { - private static final BindingSetConverter<String> converter = new BindingSetStringConverter(); + private static final BindingSetStringConverter idConverter = new BindingSetStringConverter(); + private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); private final Encoder encoder = new StringEncoder(); @@ -80,7 +83,7 @@ public class JoinResultUpdater { public void updateJoinResults( final TransactionBase tx, final String childId, - final BindingSet childBindingSet, + final VisibilityBindingSet childBindingSet, final JoinMetadata joinMetadata) throws BindingSetConversionException { checkNotNull(tx); checkNotNull(childId); @@ -113,10 +116,10 @@ public class JoinResultUpdater { } // Iterates over the sibling node's BindingSets that join with the new binding set. - FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx); + final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx); // Iterates over the resulting BindingSets from the join. - final Iterator<BindingSet> newJoinResults; + final Iterator<VisibilityBindingSet> newJoinResults; if(emittingSide == Side.LEFT) { newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets); } else { @@ -124,14 +127,15 @@ public class JoinResultUpdater { } // Insert the new join binding sets to the Fluo table. - VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); + final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); while(newJoinResults.hasNext()) { - BindingSet newJoinResult = newJoinResults.next(); - String joinBindingSetString = converter.convert(newJoinResult, joinVarOrder); + final BindingSet newJoinResult = newJoinResults.next(); + final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder); + final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder); - final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString); + final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId); final Column col = FluoQueryColumns.JOIN_BINDING_SET; - final Bytes value = encoder.encode(joinBindingSetString); + final Bytes value = encoder.encode(joinBindingSetStringValue); tx.set(row, col, value); } } @@ -143,14 +147,14 @@ public class JoinResultUpdater { LEFT, RIGHT; } - private FluoTableIterator makeSiblingScanIterator(String childId, BindingSet childBindingSet, String siblingId, TransactionBase tx) throws BindingSetConversionException { + private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); - List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); + final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); // Get the Binding strings - final String childBindingSetString = converter.convert(childBindingSet, childVarOrder); + final String childBindingSetString = valueConverter.convert(childBindingSet, childVarOrder); final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString); // Create the prefix that will be used to scan for binding sets of the sibling node. @@ -285,20 +289,20 @@ public class JoinResultUpdater { public static interface IterativeJoin { /** - * Invoked when a new {@link BindingSet} is emitted from the left child + * Invoked when a new {@link VisibilityBindingSet} is emitted from the left child * node of the join. The Fluo table is scanned for results on the right * side that will be joined with the new result. * - * @param newLeftResult - A new BindingSet that has been emitted from + * @param newLeftResult - A new VisibilityBindingSet that has been emitted from * the left child node. * @param rightResults - The right child node's binding sets that will * be joined with the new left result. (not null) * @return The new BindingSet results for the join. */ - public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults); + public Iterator<VisibilityBindingSet> newLeftResult(VisibilityBindingSet newLeftResult, Iterator<VisibilityBindingSet> rightResults); /** - * Invoked when a new {@link BindingSet} is emitted from the right child + * Invoked when a new {@link VisibilityBindingSet} is emitted from the right child * node of the join. The Fluo table is scanned for results on the left * side that will be joined with the new result. * @@ -308,7 +312,7 @@ public class JoinResultUpdater { * the right child node. * @return The new BindingSet results for the join. */ - public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult); + public Iterator<VisibilityBindingSet> newRightResult(Iterator<VisibilityBindingSet> leftResults, VisibilityBindingSet newRightResult); } /** @@ -321,7 +325,7 @@ public class JoinResultUpdater { */ public static final class NaturalJoin implements IterativeJoin { @Override - public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) { + public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) { checkNotNull(newLeftResult); checkNotNull(rightResults); @@ -330,7 +334,7 @@ public class JoinResultUpdater { } @Override - public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult) { + public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) { checkNotNull(leftResults); checkNotNull(newRightResult); @@ -349,14 +353,14 @@ public class JoinResultUpdater { */ public static final class LeftOuterJoin implements IterativeJoin { @Override - public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) { + public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) { checkNotNull(newLeftResult); checkNotNull(rightResults); // If the required portion does not join with any optional portions, // then emit a BindingSet that matches the new left result. if(!rightResults.hasNext()) { - return Lists.<BindingSet>newArrayList(newLeftResult).iterator(); + return Lists.<VisibilityBindingSet>newArrayList(newLeftResult).iterator(); } // Otherwise, return an iterator that holds the new required result @@ -365,7 +369,7 @@ public class JoinResultUpdater { } @Override - public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> leftResults, final BindingSet newRightResult) { + public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) { checkNotNull(leftResults); checkNotNull(newRightResult); @@ -382,10 +386,10 @@ public class JoinResultUpdater { * This is done lazily so that you don't have to load all of the BindingSets * into memory at once. */ - private static final class LazyJoiningIterator implements Iterator<BindingSet> { + private static final class LazyJoiningIterator implements Iterator<VisibilityBindingSet> { - private final BindingSet newResult; - private final Iterator<BindingSet> joinedResults; + private final VisibilityBindingSet newResult; + private final Iterator<VisibilityBindingSet> joinedResults; /** * Constructs an instance of {@link LazyJoiningIterator}. @@ -393,9 +397,9 @@ public class JoinResultUpdater { * @param newResult - A binding set that will be joined with some other binding sets. (not null) * @param joinResults - The binding sets that will be joined with {@code newResult}. (not null) */ - public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> joinResults) { + public LazyJoiningIterator(final VisibilityBindingSet newResult, final Iterator<VisibilityBindingSet> joinResults) { this.newResult = checkNotNull(newResult); - this.joinedResults = checkNotNull(joinResults); + joinedResults = checkNotNull(joinResults); } @Override @@ -404,18 +408,28 @@ public class JoinResultUpdater { } @Override - public BindingSet next() { + public VisibilityBindingSet next() { final MapBindingSet bs = new MapBindingSet(); - for(Binding binding : newResult) { + for(final Binding binding : newResult) { bs.addBinding(binding); } - for(Binding binding : joinedResults.next()) { + final VisibilityBindingSet joinResult = joinedResults.next(); + for(final Binding binding : joinResult) { bs.addBinding(binding); } - return bs; + String visibility = ""; + final Joiner join = Joiner.on(")&("); + final String leftVisi = newResult.getVisibility(); + final String rightVisi = joinResult.getVisibility(); + if(leftVisi.isEmpty() || rightVisi.isEmpty()) { + visibility = (leftVisi + rightVisi).trim(); + } else { + visibility = "(" + join.join(leftVisi, rightVisi) + ")"; + } + return new VisibilityBindingSet(bs, visibility); } @Override @@ -428,7 +442,7 @@ public class JoinResultUpdater { * Iterates over rows that have a Binding Set column and returns the unmarshalled * {@link BindingSet}s. */ - private static final class FluoTableIterator implements Iterator<BindingSet> { + private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> { private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet( FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, @@ -445,7 +459,7 @@ public class JoinResultUpdater { * @param varOrder - The Variable Order of binding sets that will be * read from the Fluo Table. (not null) */ - public FluoTableIterator(RowIterator rows, VariableOrder varOrder) { + public FluoTableIterator(final RowIterator rows, final VariableOrder varOrder) { this.rows = checkNotNull(rows); this.varOrder = checkNotNull(varOrder); } @@ -456,7 +470,7 @@ public class JoinResultUpdater { } @Override - public BindingSet next() { + public VisibilityBindingSet next() { final ColumnIterator columns = rows.next().getValue(); while(columns.hasNext()) { @@ -464,11 +478,7 @@ public class JoinResultUpdater { final Entry<Column, Bytes> entry = columns.next(); if(BINDING_SET_COLUMNS.contains(entry.getKey())) { final String bindingSetString = entry.getValue().toString(); - try { - return converter.convert(bindingSetString, varOrder); - } catch (BindingSetConversionException e) { - throw new RuntimeException("Could not convert one of the stored BindingSets from a String: " + bindingSetString, e); - } + return (VisibilityBindingSet) valueConverter.convert(bindingSetString, varOrder); } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index f3ff089..8e0a6fe 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -26,7 +26,6 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; import io.fluo.api.client.TransactionBase; @@ -34,8 +33,10 @@ import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; import io.fluo.api.types.Encoder; import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Updates the results of a Query node when one of its children has added a @@ -43,10 +44,10 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ @ParametersAreNonnullByDefault public class QueryResultUpdater { - private final Encoder encoder = new StringEncoder(); private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); /** * Updates the results of a Query node when one of its children has added a @@ -58,7 +59,7 @@ public class QueryResultUpdater { */ public void updateQueryResults( final TransactionBase tx, - final BindingSet childBindingSet, + final VisibilityBindingSet childBindingSet, final QueryMetadata queryMetadata) { checkNotNull(tx); checkNotNull(childBindingSet); @@ -75,11 +76,12 @@ public class QueryResultUpdater { } } final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder); + final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder); // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry. final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString); final Column col = FluoQueryColumns.QUERY_BINDING_SET; - final Bytes value = encoder.encode(queryBindingSetString); + final Bytes value = encoder.encode(queryBindingSetValueString); tx.set(row, col, value); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java index fbbae33..c2c031c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java @@ -20,9 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.export; import javax.annotation.ParametersAreNonnullByDefault; -import org.openrdf.query.BindingSet; - import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; /** * Exports a single Binding Set that is a new result for a SPARQL query to some @@ -40,7 +39,7 @@ public interface IncrementalResultExporter { * Fluo application. (not null) * @throws ResultExportException The result could not be exported. */ - public void export(TypedTransactionBase tx, String queryId, BindingSet result) throws ResultExportException; + public void export(TypedTransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; /** * A result could not be exported. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java index 4d51798..5c8c719 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java @@ -25,10 +25,10 @@ import java.util.Collections; import org.apache.accumulo.core.client.Connector; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.openrdf.query.BindingSet; import io.fluo.api.data.Bytes; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.PcjTables; import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; @@ -55,7 +55,7 @@ public class RyaResultExporter implements IncrementalResultExporter { public void export( final TypedTransactionBase fluoTx, final String queryId, - final BindingSet result) throws ResultExportException { + final VisibilityBindingSet result) throws ResultExportException { checkNotNull(fluoTx); checkNotNull(queryId); checkNotNull(result); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index aa944e4..9bd0148 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -31,13 +31,15 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; /** @@ -48,6 +50,7 @@ import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversi @ParametersAreNonnullByDefault public abstract class BindingSetUpdater extends TypedObserver { + private final Encoder encoder = new StringEncoder(); // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -74,9 +77,10 @@ public abstract class BindingSetUpdater extends TypedObserver { checkNotNull(row); checkNotNull(col); - final Observation observation = parseObservation( tx, BindingSetRow.make(row) ); + final String bindingSetString = encoder.decodeString(tx.get(row, col)); + final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) ); final String observedNodeId = observation.getObservedNodeId(); - final BindingSet observedBindingSet = observation.getObservedBindingSet(); + final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet(); final String parentNodeId = observation.getParentId(); // Figure out which node needs to handle the new metadata. @@ -100,7 +104,7 @@ public abstract class BindingSetUpdater extends TypedObserver { final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId); try { joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin); - } catch (BindingSetConversionException e) { + } catch (final BindingSetConversionException e) { throw new RuntimeException("Could not process a Join node.", e); } break; @@ -117,7 +121,7 @@ public abstract class BindingSetUpdater extends TypedObserver { public static final class Observation { private final String observedNodeId; - private final BindingSet observedBindingSet; + private final VisibilityBindingSet observedBindingSet; private final String parentNodeId; /** @@ -129,7 +133,7 @@ public abstract class BindingSetUpdater extends TypedObserver { */ public Observation( final String observedNodeId, - final BindingSet observedBindingSet, + final VisibilityBindingSet observedBindingSet, final String parentNodeId) { this.observedNodeId = checkNotNull(observedNodeId); this.observedBindingSet = checkNotNull(observedBindingSet); @@ -146,7 +150,7 @@ public abstract class BindingSetUpdater extends TypedObserver { /** * @return A Binding Set that was just emitted. */ - public BindingSet getObservedBindingSet() { + public VisibilityBindingSet getObservedBindingSet() { return observedBindingSet; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index 2accde3..fb15934 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; -import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Notified when the results of a Filter have been updated to include a new @@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ public class FilterObserver extends BindingSetUpdater { - private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -57,7 +58,7 @@ public class FilterObserver extends BindingSetUpdater { // Read the Binding Set that was just emmitted by the Filter. final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); - final BindingSet filterBindingSet = converter.convert(parsedRow.getBindingSetString(), filterVarOrder); + final VisibilityBindingSet filterBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), filterVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = filterMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index 43b0a4e..a8cd0df 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; -import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Notified when the results of a Join have been updated to include a new @@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ public class JoinObserver extends BindingSetUpdater { - private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -56,7 +57,7 @@ public class JoinObserver extends BindingSetUpdater { // Read the Binding Set that was just emmitted by the Join. final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); - final BindingSet joinBindingSet = converter.convert(parsedRow.getBindingSetString(), joinVarOrder); + final VisibilityBindingSet joinBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), joinVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = joinMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 7c1a588..fe4dc56 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -29,17 +29,19 @@ import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.openrdf.query.BindingSet; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; -import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Performs incremental result exporting to the configured destinations. @@ -47,9 +49,9 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; public class QueryResultObserver extends TypedObserver { private static final Logger log = Logger.getLogger(QueryResultObserver.class); - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - - private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); + private static final Encoder ENCODER = new StringEncoder(); + private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); /** * Builders for each type of result exporter we support. @@ -93,16 +95,16 @@ public class QueryResultObserver extends TypedObserver { @Override public void process(final TypedTransactionBase tx, final Bytes row, final Column col) { // Read the SPARQL query and it Binding Set from the row id. - final String[] queryAndBindingSet = row.toString().split(NODEID_BS_DELIM); + final String[] queryAndBindingSet = ENCODER.decodeString(row).split(NODEID_BS_DELIM); final String queryId = queryAndBindingSet[0]; - final String bindingSetString = queryAndBindingSet[1]; + final String bindingSetString = ENCODER.decodeString(tx.get(row, col)); // Fetch the query's Variable Order from the Fluo table. - final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId); + final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId); final VariableOrder varOrder = queryMetadata.getVariableOrder(); // Export the result using each of the provided exporters. - BindingSet result = converter.convert(bindingSetString, varOrder); + final VisibilityBindingSet result = (VisibilityBindingSet) CONVERTER.convert(bindingSetString, varOrder); for(final IncrementalResultExporter exporter : exporters) { try { exporter.export(tx, queryId, result); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index ddba9a2..7b1e510 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; -import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * Notified when the results of a Statement Pattern have been updated to include @@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ public class StatementPatternObserver extends BindingSetUpdater { - private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -54,10 +55,11 @@ public class StatementPatternObserver extends BindingSetUpdater { // Read the Statement Pattern metadata. final String spNodeId = parsedRow.getNodeId(); final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId); + final String bindingSetValue = parsedRow.getBindingSetString(); // Read the Binding Set that was just emmitted by the Statement Pattern. final VariableOrder spVarOrder = spMetadata.getVariableOrder(); - final BindingSet spBindingSet = converter.convert(parsedRow.getBindingSetString(), spVarOrder); + final VisibilityBindingSet spBindingSet = (VisibilityBindingSet) CONVERTER.convert(bindingSetValue, spVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = spMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index d43ffc9..496c0ed 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -40,8 +40,13 @@ import io.fluo.api.data.Column; import io.fluo.api.data.Span; import io.fluo.api.iterator.ColumnIterator; import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter; /** * An observer that matches new Triples to the Statement Patterns that are part @@ -50,7 +55,9 @@ import io.fluo.api.types.TypedTransactionBase; */ public class TripleObserver extends TypedObserver { - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + private static final Encoder ENCODER = new StringEncoder(); + private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); + private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); public TripleObserver() { super(new StringTypeLayer()); @@ -65,6 +72,11 @@ public class TripleObserver extends TypedObserver { public void process(final TypedTransactionBase tx, final Bytes row, final Column column) { //get string representation of triple final String triple = IncUpdateDAO.getTripleString(row); + final Bytes visiBytes = tx.get(row, FluoQueryColumns.TRIPLES); + String visibility = ""; + if(visiBytes != null) { + visibility = ENCODER.decodeString(visiBytes); + } //get variable metadata for all SP in table final ScannerConfiguration sc1 = new ScannerConfiguration(); @@ -75,19 +87,25 @@ public class TripleObserver extends TypedObserver { final RowIterator ri = tx.get(sc1); while(ri.hasNext()) { - final Entry<Bytes, ColumnIterator> next = ri.next(); final ColumnIterator ci = next.getValue(); final String spID = next.getKey().toString(); - final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spID); + final StatementPatternMetadata spMetadata = QUERY_DAO.readStatementPatternMetadata(tx, spID); final String pattern = spMetadata.getStatementPattern(); while(ci.hasNext()) { final String varOrders = ci.next().getValue().toString(); - final String bindingSet = getBindingSet(triple, pattern, varOrders); - if(bindingSet.length() != 0) { - tx.mutate().row(spID + NODEID_BS_DELIM + bindingSet).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(bindingSet); + final VariableOrder varOrder = new VariableOrder(varOrders); + final String bindingSetString = getBindingSet(triple, pattern, varOrders); + + //Statement matches to a binding set + if(bindingSetString.length() != 0) { + final VisibilityBindingSet bindingSet = new VisibilityBindingSet( + CONVERTER.convert(bindingSetString, varOrder), + visibility); + final String valueString = CONVERTER.convert(bindingSet, varOrder); + tx.mutate().row(spID + NODEID_BS_DELIM + bindingSetString).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(valueString); } } } @@ -96,8 +114,16 @@ public class TripleObserver extends TypedObserver { tx.delete(row, column); } - //determines whether triple matches SPID conditions and generates bindingset - //whose order is determined by varOrder + /** + * Determines whether triple matches Statement Pattern ID conditions if + * so, generates a string representation of a BindingSet whose order + * is determined by varOrder. + * @param triple - The triple to consider. + * @param spID - The statement pattern ID + * @param varOrder - The variable order + * @return The string representation of the BindingSet or an empty string, + * signifying the triple did not match the statement pattern ID. + */ private static String getBindingSet(final String triple, final String spID, final String varOrder) { final String[] spIdArray = spID.split(DELIM); final String[] tripleArray = triple.split(DELIM); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index fa25456..be24ac9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -88,7 +88,7 @@ public class FluoQueryColumns { * <p> * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>empty</td> </tr> + * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr> * </table> */ public static final Column TRIPLES = new Column("triples", "SPO"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 265ca0f..83985da 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -205,8 +205,8 @@ public class FluoQueryMetadataDAO { final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER)); final VariableOrder varOrder = new VariableOrder(varOrderString); - String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) ); - JoinType joinType = JoinType.valueOf(joinTypeString); + final String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) ); + final JoinType joinType = JoinType.valueOf(joinTypeString); final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) ); final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java index 025c3e7..08d5cef 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java @@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; + /** * Tests the methods of {@link LeftOuterJoin}. */ @@ -46,128 +48,134 @@ public class LeftOuterJoinTest { @Test public void newLeftResult_noRightMatches() { - IterativeJoin leftOuterJoin = new LeftOuterJoin(); + final IterativeJoin leftOuterJoin = new LeftOuterJoin(); // There is a new left result. - MapBindingSet newLeftResult = new MapBindingSet(); - newLeftResult.addBinding("name", vf.createLiteral("Bob")); + final MapBindingSet mapLeftResult = new MapBindingSet(); + mapLeftResult.addBinding("name", vf.createLiteral("Bob")); + final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult); // There are no right results that join with the left result. - Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator(); + final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator(); // Therefore, the left result is a new join result. - Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); + final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); - Set<BindingSet> newJoinResults = new HashSet<>(); + final Set<BindingSet> newJoinResults = new HashSet<>(); while(newJoinResultsIt.hasNext()) { newJoinResults.add( newJoinResultsIt.next() ); } - Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult ); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult ); assertEquals(expected, newJoinResults); } @Test public void newLeftResult_joinsWithRightResults() { - IterativeJoin leftOuterJoin = new LeftOuterJoin(); + final IterativeJoin leftOuterJoin = new LeftOuterJoin(); // There is a new left result. - MapBindingSet newLeftResult = new MapBindingSet(); - newLeftResult.addBinding("name", vf.createLiteral("Bob")); - newLeftResult.addBinding("height", vf.createLiteral("5'9\"")); + final MapBindingSet mapLeftResult = new MapBindingSet(); + mapLeftResult.addBinding("name", vf.createLiteral("Bob")); + mapLeftResult.addBinding("height", vf.createLiteral("5'9\"")); + final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult); // There are a few right results that join with the left result. - MapBindingSet nameAge = new MapBindingSet(); + final MapBindingSet nameAge = new MapBindingSet(); nameAge.addBinding("name", vf.createLiteral("Bob")); nameAge.addBinding("age", vf.createLiteral(56)); + final VisibilityBindingSet visiAge = new VisibilityBindingSet(nameAge); - MapBindingSet nameHair = new MapBindingSet(); + final MapBindingSet nameHair = new MapBindingSet(); nameHair.addBinding("name", vf.createLiteral("Bob")); nameHair.addBinding("hairColor", vf.createLiteral("Brown")); + final VisibilityBindingSet visiHair = new VisibilityBindingSet(nameHair); - Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList(visiAge, visiHair).iterator(); // Therefore, there are a few new join results that mix the two together. - Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); + final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); - Set<BindingSet> newJoinResults = new HashSet<>(); + final Set<BindingSet> newJoinResults = new HashSet<>(); while(newJoinResultsIt.hasNext()) { newJoinResults.add( newJoinResultsIt.next() ); } - Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); - MapBindingSet nameHeightAge = new MapBindingSet(); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + final MapBindingSet nameHeightAge = new MapBindingSet(); nameHeightAge.addBinding("name", vf.createLiteral("Bob")); nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); nameHeightAge.addBinding("age", vf.createLiteral(56)); - expected.add(nameHeightAge); + expected.add(new VisibilityBindingSet(nameHeightAge)); - MapBindingSet nameHeightHair = new MapBindingSet(); + final MapBindingSet nameHeightHair = new MapBindingSet(); nameHeightHair.addBinding("name", vf.createLiteral("Bob")); nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); - expected.add(nameHeightHair); + expected.add(new VisibilityBindingSet(nameHeightHair)); assertEquals(expected, newJoinResults); } @Test public void newRightResult_noLeftMatches() { - IterativeJoin leftOuterJoin = new LeftOuterJoin(); + final IterativeJoin leftOuterJoin = new LeftOuterJoin(); // There are no left results. - Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator(); + final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator(); // There is a new right result. - MapBindingSet newRightResult = new MapBindingSet(); + final MapBindingSet newRightResult = new MapBindingSet(); newRightResult.addBinding("name", vf.createLiteral("Bob")); // Therefore, there are no new join results. - Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult); + final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult)); assertFalse( newJoinResultsIt.hasNext() ); } @Test public void newRightResult_joinsWithLeftResults() { - IterativeJoin leftOuterJoin = new LeftOuterJoin(); + final IterativeJoin leftOuterJoin = new LeftOuterJoin(); // There are a few left results that join with the new right result. - MapBindingSet nameAge = new MapBindingSet(); + final MapBindingSet nameAge = new MapBindingSet(); nameAge.addBinding("name", vf.createLiteral("Bob")); nameAge.addBinding("age", vf.createLiteral(56)); - MapBindingSet nameHair = new MapBindingSet(); + final MapBindingSet nameHair = new MapBindingSet(); nameHair.addBinding("name", vf.createLiteral("Bob")); nameHair.addBinding("hairColor", vf.createLiteral("Brown")); - Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList( + new VisibilityBindingSet(nameAge), + new VisibilityBindingSet(nameHair)).iterator(); // There is a new right result. - MapBindingSet newRightResult = new MapBindingSet(); + final MapBindingSet newRightResult = new MapBindingSet(); newRightResult.addBinding("name", vf.createLiteral("Bob")); newRightResult.addBinding("height", vf.createLiteral("5'9\"")); // Therefore, there are a few new join results that mix the two together. - Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult); + final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult)); - Set<BindingSet> newJoinResults = new HashSet<>(); + final Set<BindingSet> newJoinResults = new HashSet<>(); while(newJoinResultsIt.hasNext()) { newJoinResults.add( newJoinResultsIt.next() ); } - Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); - MapBindingSet nameHeightAge = new MapBindingSet(); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + final MapBindingSet nameHeightAge = new MapBindingSet(); nameHeightAge.addBinding("name", vf.createLiteral("Bob")); nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); nameHeightAge.addBinding("age", vf.createLiteral(56)); - expected.add(nameHeightAge); + expected.add(new VisibilityBindingSet(nameHeightAge)); - MapBindingSet nameHeightHair = new MapBindingSet(); + final MapBindingSet nameHeightHair = new MapBindingSet(); nameHeightHair.addBinding("name", vf.createLiteral("Bob")); nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); - expected.add(nameHeightHair); + expected.add(new VisibilityBindingSet(nameHeightHair)); assertEquals(expected, newJoinResults); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java index 15023c5..651ea11 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java @@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import mvm.rya.indexing.accumulo.VisibilityBindingSet; + /** * Tests the methods of {@link NaturalJoin}. */ @@ -46,120 +48,124 @@ public class NaturalJoinTest { @Test public void newLeftResult_noRightMatches() { - IterativeJoin naturalJoin = new NaturalJoin(); + final IterativeJoin naturalJoin = new NaturalJoin(); // There is a new left result. - MapBindingSet newLeftResult = new MapBindingSet(); + final MapBindingSet newLeftResult = new MapBindingSet(); newLeftResult.addBinding("name", vf.createLiteral("Bob")); // There are no right results that join with the left result. - Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator(); + final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator(); // Therefore, the left result is a new join result. - Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults); + final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults); assertFalse( newJoinResultsIt.hasNext() ); } @Test public void newLeftResult_joinsWithRightResults() { - IterativeJoin naturalJoin = new NaturalJoin(); + final IterativeJoin naturalJoin = new NaturalJoin(); // There is a new left result. - MapBindingSet newLeftResult = new MapBindingSet(); + final MapBindingSet newLeftResult = new MapBindingSet(); newLeftResult.addBinding("name", vf.createLiteral("Bob")); newLeftResult.addBinding("height", vf.createLiteral("5'9\"")); // There are a few right results that join with the left result. - MapBindingSet nameAge = new MapBindingSet(); + final MapBindingSet nameAge = new MapBindingSet(); nameAge.addBinding("name", vf.createLiteral("Bob")); nameAge.addBinding("age", vf.createLiteral(56)); - MapBindingSet nameHair = new MapBindingSet(); + final MapBindingSet nameHair = new MapBindingSet(); nameHair.addBinding("name", vf.createLiteral("Bob")); nameHair.addBinding("hairColor", vf.createLiteral("Brown")); - Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList( + new VisibilityBindingSet(nameAge), + new VisibilityBindingSet(nameHair)).iterator(); // Therefore, there are a few new join results that mix the two together. - Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults); + final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults); - Set<BindingSet> newJoinResults = new HashSet<>(); + final Set<BindingSet> newJoinResults = new HashSet<>(); while(newJoinResultsIt.hasNext()) { newJoinResults.add( newJoinResultsIt.next() ); } - Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); - MapBindingSet nameHeightAge = new MapBindingSet(); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + final MapBindingSet nameHeightAge = new MapBindingSet(); nameHeightAge.addBinding("name", vf.createLiteral("Bob")); nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); nameHeightAge.addBinding("age", vf.createLiteral(56)); - expected.add(nameHeightAge); + expected.add(new VisibilityBindingSet(nameHeightAge)); - MapBindingSet nameHeightHair = new MapBindingSet(); + final MapBindingSet nameHeightHair = new MapBindingSet(); nameHeightHair.addBinding("name", vf.createLiteral("Bob")); nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); - expected.add(nameHeightHair); + expected.add(new VisibilityBindingSet(nameHeightHair)); assertEquals(expected, newJoinResults); } @Test public void newRightResult_noLeftMatches() { - IterativeJoin naturalJoin = new NaturalJoin(); + final IterativeJoin naturalJoin = new NaturalJoin(); // There are no left results. - Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator(); + final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator(); // There is a new right result. - MapBindingSet newRightResult = new MapBindingSet(); + final MapBindingSet newRightResult = new MapBindingSet(); newRightResult.addBinding("name", vf.createLiteral("Bob")); // Therefore, there are no new join results. - Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult); + final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult)); assertFalse( newJoinResultsIt.hasNext() ); } @Test public void newRightResult_joinsWithLeftResults() { - IterativeJoin naturalJoin = new NaturalJoin(); + final IterativeJoin naturalJoin = new NaturalJoin(); // There are a few left results that join with the new right result. - MapBindingSet nameAge = new MapBindingSet(); + final MapBindingSet nameAge = new MapBindingSet(); nameAge.addBinding("name", vf.createLiteral("Bob")); nameAge.addBinding("age", vf.createLiteral(56)); - MapBindingSet nameHair = new MapBindingSet(); + final MapBindingSet nameHair = new MapBindingSet(); nameHair.addBinding("name", vf.createLiteral("Bob")); nameHair.addBinding("hairColor", vf.createLiteral("Brown")); - Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList( + new VisibilityBindingSet(nameAge), + new VisibilityBindingSet(nameHair)).iterator(); // There is a new right result. - MapBindingSet newRightResult = new MapBindingSet(); + final MapBindingSet newRightResult = new MapBindingSet(); newRightResult.addBinding("name", vf.createLiteral("Bob")); newRightResult.addBinding("height", vf.createLiteral("5'9\"")); // Therefore, there are a few new join results that mix the two together. - Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult); + final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult)); - Set<BindingSet> newJoinResults = new HashSet<>(); + final Set<BindingSet> newJoinResults = new HashSet<>(); while(newJoinResultsIt.hasNext()) { newJoinResults.add( newJoinResultsIt.next() ); } - Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); - MapBindingSet nameHeightAge = new MapBindingSet(); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + final MapBindingSet nameHeightAge = new MapBindingSet(); nameHeightAge.addBinding("name", vf.createLiteral("Bob")); nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); nameHeightAge.addBinding("age", vf.createLiteral(56)); - expected.add(nameHeightAge); + expected.add(new VisibilityBindingSet(nameHeightAge)); - MapBindingSet nameHeightHair = new MapBindingSet(); + final MapBindingSet nameHeightHair = new MapBindingSet(); nameHeightHair.addBinding("name", vf.createLiteral("Bob")); nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); - expected.add(nameHeightHair); + expected.add(new VisibilityBindingSet(nameHeightHair)); assertEquals(expected, newJoinResults); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java index 3168a71..7be539a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java @@ -30,6 +30,8 @@ import org.openrdf.rio.RDFHandlerException; import org.openrdf.rio.RDFParser; import org.openrdf.rio.helpers.RDFHandlerBase; +import com.google.common.base.Optional; + import io.fluo.api.client.FluoClient; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RdfToRyaConversions; @@ -68,7 +70,7 @@ public class FluoLoader extends RDFHandlerBase { // If the buffer is full, flush it to the Fluo table. if(buff.size() == FLUSH_SIZE) { log.trace("Flushing " + buff.size() + " Statements from the buffer to Fluo."); - insertTriples.insert(fluoClient, buff); + insertTriples.insert(fluoClient, buff, Optional.<String>absent()); buff.clear(); } @@ -83,7 +85,7 @@ public class FluoLoader extends RDFHandlerBase { if(!buff.isEmpty()) { log.trace("Flushing the last " + buff.size() + " Statements from the buffer to Fluo."); - insertTriples.insert(fluoClient, buff); + insertTriples.insert(fluoClient, buff, Optional.<String>absent()); buff.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 0cbfa9a..eb7fb17 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -48,6 +48,7 @@ import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; import org.openrdf.sail.SailException; +import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -60,7 +61,6 @@ import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.api.resolver.RyaTypeResolverException; import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.PcjTables; @@ -77,7 +77,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class); private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - + // Employees private static final RyaURI alice = new RyaURI("http://Alice"); private static final RyaURI bob = new RyaURI("http://Bob"); @@ -290,7 +290,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { private static void loadDataIntoFluo(final FluoClient fluoClient, final Set<RyaStatement> statements) { final InsertTriples insertTriples = new InsertTriples(); for(final RyaStatement statement : statements) { - insertTriples.insert(fluoClient, statement); + insertTriples.insert(fluoClient, statement, Optional.<String>absent()); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 41c4f08..566d2d2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.junit.Test; +import com.google.common.base.Optional; import com.google.common.io.Files; import io.fluo.api.client.FluoFactory; @@ -75,7 +76,7 @@ public class CountStatementsIT extends ITBase { triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); - new InsertTriples().insert(fluoClient, triples); + new InsertTriples().insert(fluoClient, triples, Optional.<String>absent()); // Load some statements into the Fluo app. final BigInteger count = new CountStatements().countStatements(fluoClient); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 157412a..0e766b1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -33,6 +33,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.junit.Test; +import com.google.common.base.Optional; import com.google.common.collect.Sets; import mvm.rya.api.domain.RyaStatement; @@ -77,7 +78,7 @@ public class GetQueryReportIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Wait for the results to finish processing. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index 41c2d7d..6e633f6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -33,6 +33,7 @@ import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; +import com.google.common.base.Optional; import com.google.common.collect.Sets; import mvm.rya.api.domain.RyaStatement; @@ -135,7 +136,7 @@ public class InputIT extends ITBase { assertTrue( results.isEmpty() ); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -187,7 +188,7 @@ public class InputIT extends ITBase { assertEquals(expected, results); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query also include Frank. fluo.waitForObservers(); @@ -244,7 +245,7 @@ public class InputIT extends ITBase { assertEquals(expected, results); // Stream the same Alice triple into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query is stiill only Alice. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 46db8cd..f408a1c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -33,6 +33,7 @@ import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; +import com.google.common.base.Optional; import com.google.common.collect.Sets; import mvm.rya.api.domain.RyaStatement; @@ -81,7 +82,7 @@ public class QueryIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -163,7 +164,7 @@ public class QueryIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -224,7 +225,7 @@ public class QueryIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -268,7 +269,7 @@ public class QueryIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Verify the end results of the query match the expected results. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index ee3fffd..b75e624 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -43,6 +43,7 @@ import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; +import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -50,7 +51,6 @@ import com.google.common.collect.Sets; import io.fluo.api.client.Snapshot; import io.fluo.api.data.Bytes; import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTypeResolverException; import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.PcjTables; @@ -66,7 +66,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; public class RyaExportIT extends ITBase { private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - + /** * Configure the export observer to use the Mini Accumulo instance as the * export destination for new PCJ results. @@ -138,7 +138,7 @@ public class RyaExportIT extends ITBase { new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples); + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); // Fetch the exported results from Accumulo once the observers finish working. fluo.waitForObservers();
