http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java index 859cd4b..85f1b1f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java @@ -19,7 +19,6 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import javax.annotation.ParametersAreNonnullByDefault; @@ -35,17 +34,17 @@ import io.fluo.api.data.Bytes; @ParametersAreNonnullByDefault public class BindingSetRow { private final String nodeId; - private final String[] bindingStrings; + private final String bindingSetString; /** * Constructs an instance of {@link BindingSetRow}. * * @param nodeId - The Node ID of a query node. (not null) - * @param bindingStrings - A Binding Set that is part of the node's results. (not null) + * @param bindingSetString - A Binding Set that is part of the node's results. (not null) */ - public BindingSetRow(final String nodeId, final String[] bindingStrings) { + public BindingSetRow(final String nodeId, final String bindingSetString) { this.nodeId = checkNotNull(nodeId); - this.bindingStrings = checkNotNull(bindingStrings); + this.bindingSetString = checkNotNull(bindingSetString); } /** @@ -56,11 +55,10 @@ public class BindingSetRow { } /** - * @return A Binding Set that is part of the node's results. It is formatted - * in SPO order and each String requires further interpretation. + * @return A Binding Set that is part of the node's results. */ - public String[] getBindingStrings() { - return bindingStrings; + public String getBindingSetString() { + return bindingSetString; } /** @@ -77,11 +75,10 @@ public class BindingSetRow { if(rowArray.length != 2) { throw new IllegalArgumentException("A row must contain a single NODEID_BS_DELIM."); } - final String nodeId = rowArray[0]; - // Read the row's Binding Set from the bytes. - final String[] bindingStrings = rowArray[1].split(DELIM); + final String nodeId = rowArray[0]; + String bindingSetString = rowArray[1]; - return new BindingSetRow(nodeId, bindingStrings); + return new BindingSetRow(nodeId, bindingSetString); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 5ff5acc..3af079f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -62,6 +62,8 @@ public class FilterResultUpdater { private final Encoder encoder = new StringEncoder(); + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + /** * A utility class used to search SPARQL queries for Filters. */ @@ -119,10 +121,12 @@ public class FilterResultUpdater { final MapBindingSet filterBindingSet = new MapBindingSet(); for(final String bindingName : filterVarOrder) { - final Binding binding = childBindingSet.getBinding(bindingName); - filterBindingSet.addBinding(binding); + if(childBindingSet.hasBinding(bindingName)) { + final Binding binding = childBindingSet.getBinding(bindingName); + filterBindingSet.addBinding(binding); + } } - final String filterBindingSetString = BindingSetStringConverter.toString(filterBindingSet, filterVarOrder); + final String filterBindingSetString = converter.convert(filterBindingSet, filterVarOrder); final Bytes row = encoder.encode( filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetString ); final Column col = FluoQueryColumns.FILTER_BINDING_SET; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java index 61e3d5f..aa7c959 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java @@ -18,32 +18,20 @@ */ package org.apache.rya.indexing.pcj.fluo.app; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; - -import java.util.Collection; import javax.annotation.ParametersAreNonnullByDefault; import org.openrdf.model.Literal; import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.base.Joiner; import mvm.rya.api.domain.RyaType; import mvm.rya.api.resolver.RdfToRyaConversions; @@ -55,117 +43,6 @@ import mvm.rya.api.resolver.RdfToRyaConversions; @ParametersAreNonnullByDefault public class FluoStringConverter { - private static final ValueFactory valueFactory = new ValueFactoryImpl(); - - /** - * Converts an ordered collection of variables into the Variable Order - * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS} - * column of the Fluo application. - * - * @param varOrder - An ordered collection of variables. (not null) - * @return The string representation of the variable order. - */ - public static String toVarOrderString(final Collection<String> varOrder) { - checkNotNull(varOrder); - return Joiner.on(VAR_DELIM).join(varOrder); - } - - /** - * Converts an ordered array of variables into the Variable Order - * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS} - * column of the Fluo application. - * - * @param varOrder - An ordered array of variables. (not null) - * @return The string representation of the variable order. - */ - public static String toVarOrderString(final String... varOrder) { - return Joiner.on(VAR_DELIM).join(varOrder); - } - - /** - * Converts a String into an array holding the Variable Order of a Binding Set. - * - * @param varOrderString - The string representation of the variable order. (not null) - * @return An ordered array holding the variable order of a binding set. - */ - public static String[] toVarOrder(final String varOrderString) { - checkNotNull(varOrderString); - return varOrderString.split(VAR_DELIM); - } - - /** - * Converts a {@link BindingSet} to the String representation that the Fluo - * application serializes to the Binding Set columns. - * - * @param bindingSet - The binding set values. (not null) - * @param varOrder - The order the variables must appear in. (not null) - * @return A {@code String} version of {@code bindingSet} suitable for - * serialization to one of the Fluo application's binding set columns. - */ - public static String toBindingSetString(final BindingSet bindingSet, final String[] varOrder) { - checkNotNull(bindingSet); - checkNotNull(varOrder); - - final StringBuilder bindingSetString = new StringBuilder(); - - for(int i = 0; i < varOrder.length; i++) { - // Add a value to the binding set. - final String varName = varOrder[i]; - final Value value = bindingSet.getBinding(varName).getValue(); - final RyaType ryaValue = RdfToRyaConversions.convertValue(value); - bindingSetString.append( ryaValue.getData() ).append(TYPE_DELIM).append( ryaValue.getDataType() ); - - // If there are more values to add, include a delimiter between them. - if(i != varOrder.length-1) { - bindingSetString.append(DELIM); - } - } - - return bindingSetString.toString(); - } - - /** - * Converts the String representation of a {@link BindingSet} as is created - * by {@link #toBindingSetString(BindingSet, String[])} back into a - * BindingSet. - * - * @param bindingSetString - The binding set values as a String. (not null) - * @param varOrder - The order the variables appear in the String version of - * the BindingSet. (not null) - * @return A {@link BindingSet} representation of the String. - */ - public static BindingSet toBindingSet(final String bindingSetString, final String[] varOrder) { - checkNotNull(bindingSetString); - checkNotNull(varOrder); - - final String[] bindingStrings = toBindingStrings(bindingSetString); - return toBindingSet(bindingStrings, varOrder); - } - - /** - * Creates a {@link BindingSet} from an ordered array of Strings that represent - * {@link Binding}s and their variable names. - * - * @param bindingStrings - An ordered array of Strings representing {@link Binding}s. (not null) - * @param varOrder - An ordered array of variable names for the binding strings. (not null) - * @return The parameters converted into a {@link BindingSet}. - */ - public static BindingSet toBindingSet(final String[] bindingStrings, final String[] varOrder) { - checkNotNull(varOrder); - checkNotNull(bindingStrings); - checkArgument(varOrder.length == bindingStrings.length); - - final QueryBindingSet bindingSet = new QueryBindingSet(); - - for(int i = 0; i < bindingStrings.length; i++) { - final String name = varOrder[i]; - final Value value = FluoStringConverter.toValue(bindingStrings[i]); - bindingSet.addBinding(name, value); - } - - return bindingSet; - } - /** * Extract the {@link Binding} strings from a {@link BindingSet}'s string form. * @@ -178,35 +55,6 @@ public class FluoStringConverter { } /** - * Creates a {@link Value} from a String representation of it. - * - * @param valueString - The String representation of the value. (not null) - * @return The {@link Value} representation of the String. - */ - public static Value toValue(final String valueString) { - checkNotNull(valueString); - - // Split the String that was stored in Fluo into its Value and Type parts. - final String[] valueAndType = valueString.split(TYPE_DELIM); - if(valueAndType.length != 2) { - throw new IllegalArgumentException("Array must contain data and type info!"); - } - - final String dataString = valueAndType[0]; - final String typeString = valueAndType[1]; - - // Convert the String Type into a URI that describes the type. - final URI typeURI = valueFactory.createURI(typeString); - - // Convert the String Value into a Value. - final Value value = typeURI.equals(XMLSchema.ANYURI) ? - valueFactory.createURI(dataString) : - valueFactory.createLiteral(dataString, new URIImpl(typeString)); - - return value; - } - - /** * Converts the String representation of a {@link StatementPattern} back * into the object version. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index a25cb92..bc558a5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -25,18 +25,21 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Optional; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.fluo.api.client.TransactionBase; import io.fluo.api.config.ScannerConfiguration; @@ -47,6 +50,9 @@ import io.fluo.api.iterator.ColumnIterator; import io.fluo.api.iterator.RowIterator; import io.fluo.api.types.Encoder; import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.external.tupleSet.BindingSetConverter; +import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; /** @@ -55,7 +61,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; */ @ParametersAreNonnullByDefault public class JoinResultUpdater { - private static final Logger log = Logger.getLogger(JoinResultUpdater.class); + + private static final BindingSetConverter<String> converter = new BindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); private final Encoder encoder = new StringEncoder(); @@ -67,93 +74,112 @@ public class JoinResultUpdater { * @param tx - The transaction all Fluo queries will use. (not null) * @param childId - The Node ID of the child whose results received a new Binding Set. (not null) * @param childBindingSet - The Binding Set that was just emitted by child node. (not null) - * @param joinMetadata - The metadatat for the Join that has been notified. (not null) + * @param joinMetadata - The metadata for the Join that has been notified. (not null) + * @throws BindingSetConversionException */ public void updateJoinResults( final TransactionBase tx, final String childId, final BindingSet childBindingSet, - final JoinMetadata joinMetadata) { + final JoinMetadata joinMetadata) throws BindingSetConversionException { checkNotNull(tx); checkNotNull(childId); + checkNotNull(childBindingSet); checkNotNull(joinMetadata); - // Read the Join metadata from the Fluo table. - final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray(); + // Figure out which join algorithm we are going to use. + final IterativeJoin joinAlgorithm; + switch(joinMetadata.getJoinType()) { + case NATURAL_JOIN: + joinAlgorithm = new NaturalJoin(); + break; + case LEFT_OUTER_JOIN: + joinAlgorithm = new LeftOuterJoin(); + break; + default: + throw new RuntimeException("Unsupported JoinType: " + joinMetadata.getJoinType()); + } + + // Figure out which side of the join the new binding set appeared on. + final Side emittingSide; + final String siblingId; - // Transform the Child binding set and varaible order values to be easier to work with. - final VariableOrder childVarOrder = getVarOrder(tx, childId); - final String[] childVarOrderArray = childVarOrder.toArray(); - final String childBindingSetString = FluoStringConverter.toBindingSetString(childBindingSet, childVarOrder.toArray()); - final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString); + if(childId.equals(joinMetadata.getLeftChildNodeId())) { + emittingSide = Side.LEFT; + siblingId = joinMetadata.getRightChildNodeId(); + } else { + emittingSide = Side.RIGHT; + siblingId = joinMetadata.getLeftChildNodeId(); + } - // Transform the Sibling binding set and varaible order values to be easier to work with. - final String leftChildId = joinMetadata.getLeftChildNodeId(); - final String rightChildId = joinMetadata.getRightChildNodeId(); - final String siblingId = leftChildId.equals(childId) ? rightChildId : leftChildId; + // Iterates over the sibling node's BindingSets that join with the new binding set. + FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx); - final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); - final String[] siblingVarOrderArray = siblingVarOrder.toArray(); - - // Create a map that will be used later in this algorithm to create new Join result - // Binding Sets. It is initialized with all of the values that are in childBindingSet. - // The common values and any that are added on by the sibling will be overwritten - // for each sibling scan result. - final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); - final Map<String, String> joinBindingSet = Maps.newHashMap(); - for(int i = 0; i < childVarOrderArray.length; i++) { - joinBindingSet.put(childVarOrderArray[i], childBindingStrings[i]); + // Iterates over the resulting BindingSets from the join. + final Iterator<BindingSet> newJoinResults; + if(emittingSide == Side.LEFT) { + newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets); + } else { + newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, childBindingSet); } + // Insert the new join binding sets to the Fluo table. + VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); + while(newJoinResults.hasNext()) { + BindingSet newJoinResult = newJoinResults.next(); + String joinBindingSetString = converter.convert(newJoinResult, joinVarOrder); + + final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString); + final Column col = FluoQueryColumns.JOIN_BINDING_SET; + final Bytes value = encoder.encode(joinBindingSetString); + tx.set(row, col, value); + } + } + + /** + * The different sides a new binding set may appear on. + */ + public static enum Side { + LEFT, RIGHT; + } + + private FluoTableIterator makeSiblingScanIterator(String childId, BindingSet childBindingSet, String siblingId, TransactionBase tx) throws BindingSetConversionException { + // Get the common variable orders. These are used to build the prefix. + final VariableOrder childVarOrder = getVarOrder(tx, childId); + final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); + List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); + + // Get the Binding strings + final String childBindingSetString = converter.convert(childBindingSet, childVarOrder); + final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString); + // Create the prefix that will be used to scan for binding sets of the sibling node. // This prefix includes the sibling Node ID and the common variable values from // childBindingSet. - String bsPrefix = ""; + String siblingScanPrefix = ""; for(int i = 0; i < commonVars.size(); i++) { - if(bsPrefix.length() == 0) { - bsPrefix = childBindingStrings[i]; + if(siblingScanPrefix.length() == 0) { + siblingScanPrefix = childBindingStrings[i]; } else { - bsPrefix += DELIM + childBindingStrings[i]; + siblingScanPrefix += DELIM + childBindingStrings[i]; } } - bsPrefix = siblingId + NODEID_BS_DELIM + bsPrefix; + siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix; // Scan the sibling node's binding sets for those that have the same // common variable values as childBindingSet. These needs to be joined // and inserted into the Join's results. It's possible that none of these // results will be new Join results if they have already been created in // earlier iterations of this algorithm. - final ScannerConfiguration sc1 = new ScannerConfiguration(); - sc1.setSpan(Span.prefix(bsPrefix)); - setScanColumnFamily(sc1, siblingId); - - try { - final RowIterator ri = tx.get(sc1); - while(ri.hasNext()) { - final ColumnIterator ci = ri.next().getValue(); - while(ci.hasNext()){ - // Get a sibling binding set. - final String siblingBindingSetString = ci.next().getValue().toString(); - final String[] siblingBindingStrings = FluoStringConverter.toBindingStrings(siblingBindingSetString); - - // Overwrite the previous sibling's values to create a new join binding set. - for (int i = 0; i < siblingBindingStrings.length; i++) { - joinBindingSet.put(siblingVarOrderArray[i], siblingBindingStrings[i]); - } - final String joinBindingSetString = makeBindingSetString(joinVarOrder, joinBindingSet); + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.setSpan(Span.prefix(siblingScanPrefix)); + setScanColumnFamily(scanConfig, siblingId); - // Write the join binding set to Fluo. - final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString); - final Column col = FluoQueryColumns.JOIN_BINDING_SET; - final Bytes value = encoder.encode(joinBindingSetString); - tx.set(row, col, value); - } - } - } catch (final Exception e) { - log.error("Error while scanning sibling binding sets to create new join results.", e); - } + final RowIterator ri = tx.get(scanConfig); + return new FluoTableIterator(ri, siblingVarOrder); } + /** * Fetch the {@link VariableOrder} of a query node. * @@ -216,40 +242,6 @@ public class JoinResultUpdater { return commonVars; } -// /** -// * Assuming that the common variables between two children are already -// * shifted to the left, find the common variables between them. -// * <p> -// * Refer to {@link FluoQueryInitializer} to see why this assumption is being made. -// * -// * @param vars1 - The first child's variable order. (not null) -// * @param vars2 - The second child's variable order. (not null) -// * @return An ordered List of the common variables between the two children. -// */ -// private List<String> getCommonVars(final String[] vars1, final String[] vars2) { -// checkNotNull(vars1); -// checkNotNull(vars2); -// -// final List<String> commonVars = new ArrayList<>(); -// -// // Only need to iteratre through the shorted order's length. -// final int shortestLen = Math.min(vars1.length, vars2.length); -// for(int i = 0; i < shortestLen; i++) { -// final String var1 = vars1[i]; -// final String var2 = vars2[i]; -// -// if(var1.equals(var2)) { -// commonVars.add(var1); -// } else { -// // Because the common variables are left shifted, we can break once -// // we encounter a pair that does not match. -// break; -// } -// } -// -// return commonVars; -// } - /** * Update a {@link ScannerConfiguration} to use the sibling node's binding * set column for its scan. The column that will be used is determined by the @@ -281,34 +273,211 @@ public class JoinResultUpdater { column = FluoQueryColumns.JOIN_BINDING_SET; break; default: - throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, or Filter."); + throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter."); } sc.fetchColumn(column.getFamily(), column.getQualifier()); } /** - * Create a Binding Set String from a variable order and a map of bindings. - * - * @param varOrder - The resulting binding set's variable order. (not null) - * @param bindingSetValues - A map holding the variables and their values that will be - * included in the resulting binding set. - * @return A binding set string build from the map using the prescribed variable order. + * Defines each of the cases that may generate new join results when + * iteratively computing a query's join node. */ - private static String makeBindingSetString(final String[] varOrder, final Map<String, String> bindingSetValues) { - checkNotNull(varOrder); - checkNotNull(bindingSetValues); + public static interface IterativeJoin { + + /** + * Invoked when a new {@link BindingSet} is emitted from the left child + * node of the join. The Fluo table is scanned for results on the right + * side that will be joined with the new result. + * + * @param newLeftResult - A new BindingSet that has been emitted from + * the left child node. + * @param rightResults - The right child node's binding sets that will + * be joined with the new left result. (not null) + * @return The new BindingSet results for the join. + */ + public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults); + + /** + * Invoked when a new {@link BindingSet} is emitted from the right child + * node of the join. The Fluo table is scanned for results on the left + * side that will be joined with the new result. + * + * @param leftResults - The left child node's binding sets that will be + * joined with the new right result. + * @param newRightResult - A new BindingSet that has been emitted from + * the right child node. + * @return The new BindingSet results for the join. + */ + public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult); + } - String bindingSetString = ""; + /** + * Implements an {@link IterativeJoin} that uses the Natural Join algorithm + * defined by Relational Algebra. + * <p> + * This is how you combine {@code BindnigSet}s that may have common Binding + * names. When two Binding Sets are joined, any bindings that appear in both + * binding sets are only included once. + */ + public static final class NaturalJoin implements IterativeJoin { + @Override + public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) { + checkNotNull(newLeftResult); + checkNotNull(rightResults); + + // Both sides are required, so if there are no right results, then do not emit anything. + return new LazyJoiningIterator(newLeftResult, rightResults); + } - for (final String joinVar : varOrder) { - if (bindingSetString.length() == 0) { - bindingSetString = bindingSetValues.get(joinVar); - } else { - bindingSetString = bindingSetString + DELIM + bindingSetValues.get(joinVar); + @Override + public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult) { + checkNotNull(leftResults); + checkNotNull(newRightResult); + + // Both sides are required, so if there are no left reuslts, then do not emit anything. + return new LazyJoiningIterator(newRightResult, leftResults); + } + } + + /** + * Implements an {@link IterativeJoin} that uses the Left Outer Join + * algorithm defined by Relational Algebra. + * <p> + * This is how you add optional information to a {@link BindingSet}. Left + * binding sets are emitted even if they do not join with anything on the right. + * However, right binding sets must be joined with a left binding set. + */ + public static final class LeftOuterJoin implements IterativeJoin { + @Override + public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) { + checkNotNull(newLeftResult); + checkNotNull(rightResults); + + // If the required portion does not join with any optional portions, + // then emit a BindingSet that matches the new left result. + if(!rightResults.hasNext()) { + return Lists.<BindingSet>newArrayList(newLeftResult).iterator(); + } + + // Otherwise, return an iterator that holds the new required result + // joined with the right results. + return new LazyJoiningIterator(newLeftResult, rightResults); + } + + @Override + public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> leftResults, final BindingSet newRightResult) { + checkNotNull(leftResults); + checkNotNull(newRightResult); + + // The right result is optional, so if it does not join with anything + // on the left, then do not emit anything. + return new LazyJoiningIterator(newRightResult, leftResults); + } + } + + /** + * Joins a {@link BindingSet} (which is new to the left or right side of a join) + * to all binding sets on the other side that join with it. + * <p> + * This is done lazily so that you don't have to load all of the BindingSets + * into memory at once. + */ + private static final class LazyJoiningIterator implements Iterator<BindingSet> { + + private final BindingSet newResult; + private final Iterator<BindingSet> joinedResults; + + /** + * Constructs an instance of {@link LazyJoiningIterator}. + * + * @param newResult - A binding set that will be joined with some other binding sets. (not null) + * @param joinResults - The binding sets that will be joined with {@code newResult}. (not null) + */ + public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> joinResults) { + this.newResult = checkNotNull(newResult); + this.joinedResults = checkNotNull(joinResults); + } + + @Override + public boolean hasNext() { + return joinedResults.hasNext(); + } + + @Override + public BindingSet next() { + final MapBindingSet bs = new MapBindingSet(); + + for(Binding binding : newResult) { + bs.addBinding(binding); + } + + for(Binding binding : joinedResults.next()) { + bs.addBinding(binding); } + + return bs; } - return bindingSetString; + @Override + public void remove() { + throw new UnsupportedOperationException("remove() is unsupported."); + } } -} + /** + * Iterates over rows that have a Binding Set column and returns the unmarshalled + * {@link BindingSet}s. + */ + private static final class FluoTableIterator implements Iterator<BindingSet> { + + private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet( + FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, + FluoQueryColumns.JOIN_BINDING_SET, + FluoQueryColumns.FILTER_BINDING_SET); + + private final RowIterator rows; + private final VariableOrder varOrder; + + /** + * Constructs an instance of {@link FluoTableIterator}. + * + * @param rows - Iterates over RowId values in a Fluo Table. (not null) + * @param varOrder - The Variable Order of binding sets that will be + * read from the Fluo Table. (not null) + */ + public FluoTableIterator(RowIterator rows, VariableOrder varOrder) { + this.rows = checkNotNull(rows); + this.varOrder = checkNotNull(varOrder); + } + + @Override + public boolean hasNext() { + return rows.hasNext(); + } + + @Override + public BindingSet next() { + final ColumnIterator columns = rows.next().getValue(); + + while(columns.hasNext()) { + // If this is one of the BindingSet columns, handle it and return the BindingSet. + final Entry<Column, Bytes> entry = columns.next(); + if(BINDING_SET_COLUMNS.contains(entry.getKey())) { + final String bindingSetString = entry.getValue().toString(); + try { + return converter.convert(bindingSetString, varOrder); + } catch (BindingSetConversionException e) { + throw new RuntimeException("Could not convert one of the stored BindingSets from a String: " + bindingSetString, e); + } + } + } + + throw new RuntimeException("Row did not containing a Binding Set."); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() is unsupported."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index e4d0155..f3ff089 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -46,6 +46,8 @@ public class QueryResultUpdater { private final Encoder encoder = new StringEncoder(); + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + /** * Updates the results of a Query node when one of its children has added a * new Binding Set to its results. @@ -67,10 +69,12 @@ public class QueryResultUpdater { final MapBindingSet queryBindingSet = new MapBindingSet(); for(final String bindingName : queryVarOrder) { - final Binding binding = childBindingSet.getBinding(bindingName); - queryBindingSet.addBinding(binding); + if(childBindingSet.hasBinding(bindingName)) { + final Binding binding = childBindingSet.getBinding(bindingName); + queryBindingSet.addBinding(binding); + } } - final String queryBindingSetString = BindingSetStringConverter.toString(queryBindingSet, queryVarOrder); + final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder); // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry. final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index d3c5bb5..aa944e4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -38,6 +38,7 @@ import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; /** * Notified when the results of a node have been updated to include a new Binding @@ -97,7 +98,11 @@ public abstract class BindingSetUpdater extends TypedObserver { case JOIN: final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId); - joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin); + try { + joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin); + } catch (BindingSetConversionException e) { + throw new RuntimeException("Could not process a Join node.", e); + } break; default: http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index c6b6303..2accde3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; /** * Notified when the results of a Filter have been updated to include a new @@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase; */ public class FilterObserver extends BindingSetUpdater { + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @Override @@ -53,9 +56,8 @@ public class FilterObserver extends BindingSetUpdater { final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId); // Read the Binding Set that was just emmitted by the Filter. - final String[] filterBindingStrings = parsedRow.getBindingStrings(); - final String[] filterVarOrder = filterMetadata.getVariableOrder().toArray(); - final BindingSet filterBindingSet = FluoStringConverter.toBindingSet(filterBindingStrings, filterVarOrder); + final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); + final BindingSet filterBindingSet = converter.convert(parsedRow.getBindingSetString(), filterVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = filterMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index 980c7bc..43b0a4e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; /** * Notified when the results of a Join have been updated to include a new @@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase; */ public class JoinObserver extends BindingSetUpdater { + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @Override @@ -52,9 +55,8 @@ public class JoinObserver extends BindingSetUpdater { final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId); // Read the Binding Set that was just emmitted by the Join. - final String[] joinBindingStrings = parsedRow.getBindingStrings(); - final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray(); - final BindingSet joinBindingSet = FluoStringConverter.toBindingSet(joinBindingStrings, joinVarOrder); + final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); + final BindingSet joinBindingSet = converter.convert(parsedRow.getBindingSetString(), joinVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = joinMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 8a60039..7c1a588 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; @@ -39,6 +38,8 @@ import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; /** * Performs incremental result exporting to the configured destinations. @@ -48,6 +49,8 @@ public class QueryResultObserver extends TypedObserver { private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + /** * Builders for each type of result exporter we support. */ @@ -96,10 +99,10 @@ public class QueryResultObserver extends TypedObserver { // Fetch the query's Variable Order from the Fluo table. final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId); - final String[] varOrder = queryMetadata.getVariableOrder().toArray(); + final VariableOrder varOrder = queryMetadata.getVariableOrder(); // Export the result using each of the provided exporters. - final BindingSet result = FluoStringConverter.toBindingSet(bindingSetString, varOrder); + BindingSet result = converter.convert(bindingSetString, varOrder); for(final IncrementalResultExporter exporter : exporters) { try { exporter.export(tx, queryId, result); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index 00f9b13..ddba9a2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.openrdf.query.BindingSet; import io.fluo.api.client.TransactionBase; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; /** * Notified when the results of a Statement Pattern have been updated to include @@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase; */ public class StatementPatternObserver extends BindingSetUpdater { + private final BindingSetStringConverter converter = new BindingSetStringConverter(); + // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -53,9 +56,8 @@ public class StatementPatternObserver extends BindingSetUpdater { final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId); // Read the Binding Set that was just emmitted by the Statement Pattern. - final String[] spBindingStrings = parsedRow.getBindingStrings(); - final String[] spVarOrder = spMetadata.getVariableOrder().toArray(); - final BindingSet spBindingSet = FluoStringConverter.toBindingSet(spBindingStrings, spVarOrder); + final VariableOrder spVarOrder = spMetadata.getVariableOrder(); + final BindingSet spBindingSet = converter.convert(parsedRow.getBindingSetString(), spVarOrder); // Figure out which node needs to handle the new metadata. final String parentNodeId = spMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 3d766ed..fa25456 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -54,6 +54,7 @@ import io.fluo.api.data.Column; * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:nodeId</td> <td>The Node ID of the Join.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>joinMetadata:joinType</td> <td>The Join algorithm that will be used when computing join results.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> @@ -128,6 +129,7 @@ public class FluoQueryColumns { // Join Metadata columns. public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId"); public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder"); + public static final Column JOIN_TYPE = new Column(JOIN_METADATA_CF, "joinType"); public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId"); public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId"); public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 0ab5f18..265ca0f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import com.google.common.collect.Sets; @@ -170,6 +171,7 @@ public class FluoQueryMetadataDAO { final Bytes rowId = encoder.encode(metadata.getNodeId()); tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId); tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); + tx.set(rowId, FluoQueryColumns.JOIN_TYPE, encoder.encode(metadata.getJoinType().toString()) ); tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, encoder.encode( metadata.getLeftChildNodeId() )); tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, encoder.encode( metadata.getRightChildNodeId() )); @@ -194,6 +196,7 @@ public class FluoQueryMetadataDAO { final Bytes rowId = encoder.encode(nodeId); final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( FluoQueryColumns.JOIN_VARIABLE_ORDER, + FluoQueryColumns.JOIN_TYPE, FluoQueryColumns.JOIN_PARENT_NODE_ID, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID)); @@ -202,12 +205,16 @@ public class FluoQueryMetadataDAO { final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER)); final VariableOrder varOrder = new VariableOrder(varOrderString); + String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) ); + JoinType joinType = JoinType.valueOf(joinTypeString); + final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) ); final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) ); final String rightChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) ); return JoinMetadata.builder(nodeId) .setVariableOrder(varOrder) + .setJoinType(joinType) .setParentNodeId(parentNodeId) .setLeftChildNodeId(leftChildNodeId) .setRightChildNodeId(rightChildNodeId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java index 2546972..566ce61 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java @@ -37,6 +37,15 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; @ParametersAreNonnullByDefault public class JoinMetadata extends CommonNodeMetadata { + /** + * The different types of Join algorithms that this join may perform. + */ + public static enum JoinType { + NATURAL_JOIN, + LEFT_OUTER_JOIN; + } + + private final JoinType joinType; private final String parentNodeId; private final String leftChildNodeId; private final String rightChildNodeId; @@ -46,6 +55,7 @@ public class JoinMetadata extends CommonNodeMetadata { * * @param nodeId - The ID the Fluo app uses to reference this node. (not null) * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) + * @param joinType - Defines which join algorithm the join will use. * @param parentNodeId - The node id of this node's parent. (not null) * @param leftChildNodeId - One of the nodes whose results are being joined. (not null) * @param rightChildNodeId - The other node whose results are being joined. (not null) @@ -53,16 +63,25 @@ public class JoinMetadata extends CommonNodeMetadata { public JoinMetadata( final String nodeId, final VariableOrder varOrder, + final JoinType joinType, final String parentNodeId, final String leftChildNodeId, final String rightChildNodeId) { super(nodeId, varOrder); + this.joinType = checkNotNull(joinType); this.parentNodeId = checkNotNull(parentNodeId); this.leftChildNodeId = checkNotNull(leftChildNodeId); this.rightChildNodeId = checkNotNull(rightChildNodeId); } /** + * @return Defines which join algorithm the join will use. + */ + public JoinType getJoinType() { + return joinType; + } + + /** * @return The node id of this node's parent. */ public String getParentNodeId() { @@ -88,6 +107,7 @@ public class JoinMetadata extends CommonNodeMetadata { return Objects.hashCode( super.getNodeId(), super.getVariableOrder(), + joinType, parentNodeId, leftChildNodeId, rightChildNodeId); @@ -103,6 +123,7 @@ public class JoinMetadata extends CommonNodeMetadata { if(super.equals(o)) { final JoinMetadata joinMetadata = (JoinMetadata)o; return new EqualsBuilder() + .append(joinType, joinMetadata.joinType) .append(parentNodeId, joinMetadata.parentNodeId) .append(leftChildNodeId, joinMetadata.leftChildNodeId) .append(rightChildNodeId, joinMetadata.rightChildNodeId) @@ -120,6 +141,7 @@ public class JoinMetadata extends CommonNodeMetadata { .append("Join Metadata {\n") .append(" Node ID: " + super.getNodeId() + "\n") .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Join Type: " + joinType + "\n") .append(" Parent Node ID: " + parentNodeId + "\n") .append(" Left Child Node ID: " + leftChildNodeId + "\n") .append(" Right Child Node ID: " + rightChildNodeId + "\n") @@ -145,6 +167,7 @@ public class JoinMetadata extends CommonNodeMetadata { private final String nodeId; private VariableOrder varOrder; + private JoinType joinType; private String parentNodeId; private String leftChildNodeId; private String rightChildNodeId; @@ -188,6 +211,17 @@ public class JoinMetadata extends CommonNodeMetadata { } /** + * Sets the type of join algorithm that will be used by this join. + * + * @param joinType - Defines which join algorithm the join will use. + * @return This builder so that method invocation could be chained. + */ + public Builder setJoinType(@Nullable final JoinType joinType) { + this.joinType = joinType; + return this; + } + + /** * Set one of the nodes whose results are being joined. * * @param leftChildNodeId - One of the nodes whose results are being joined. @@ -216,6 +250,7 @@ public class JoinMetadata extends CommonNodeMetadata { return new JoinMetadata( nodeId, varOrder, + joinType, parentNodeId, leftChildNodeId, rightChildNodeId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 844a7a4..25b4e4f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -19,7 +19,6 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter.toVarOrderString; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; @@ -39,8 +38,10 @@ import javax.annotation.concurrent.Immutable; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.LeftJoin; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.QueryModelNode; import org.openrdf.query.algebra.StatementPattern; @@ -152,7 +153,7 @@ public class SparqlFluoQueryBuilder { prefix = SP_PREFIX; } else if(node instanceof Filter) { prefix = FILTER_PREFIX; - } else if(node instanceof Join) { + } else if(node instanceof Join || node instanceof LeftJoin) { prefix = JOIN_PREFIX; } else if(node instanceof Projection) { prefix = QUERY_PREFIX; @@ -218,18 +219,36 @@ public class SparqlFluoQueryBuilder { } @Override + public void meet(LeftJoin node) { + // Extract the metadata that will be stored for the node. + String leftJoinNodeId = nodeIds.getOrMakeId(node); + final QueryModelNode left = node.getLeftArg(); + final QueryModelNode right = node.getRightArg(); + + // Update the metadata for the JoinMetadata.Builder. + makeJoinMetadata(leftJoinNodeId, JoinType.LEFT_OUTER_JOIN, left, right); + + // Walk to the next node. + super.meet(node); + } + + @Override public void meet(final Join node) { // Extract the metadata that will be stored from the node. final String joinNodeId = nodeIds.getOrMakeId(node); final QueryModelNode left = node.getLeftArg(); final QueryModelNode right = node.getRightArg(); - if(left == null || right == null) { - throw new IllegalArgumentException("Join args connot be null."); - } + // Update the metadata for the JoinMetadata.Builder. + makeJoinMetadata(joinNodeId, JoinType.NATURAL_JOIN, left, right); + + // Walk to the next node. + super.meet(node); + } + private void makeJoinMetadata(String joinNodeId, JoinType joinType, QueryModelNode left, QueryModelNode right) { final String leftChildNodeId = nodeIds.getOrMakeId(left); - final String rightChildNodeId = nodeIds.getOrMakeId( right ); + final String rightChildNodeId = nodeIds.getOrMakeId(right); // Get or create a builder for this node populated with the known metadata. JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull(); @@ -237,6 +256,7 @@ public class SparqlFluoQueryBuilder { joinBuilder = JoinMetadata.builder(joinNodeId); fluoQueryBuilder.addJoinMetadata(joinBuilder); } + joinBuilder.setJoinType(joinType); joinBuilder.setLeftChildNodeId( leftChildNodeId ); joinBuilder.setRightChildNodeId( rightChildNodeId ); @@ -247,15 +267,12 @@ public class SparqlFluoQueryBuilder { final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, rightVars); // Create or update the left child's variable order and parent node id. - final VariableOrder leftVarOrder = new VariableOrder( varOrders.getLeftVarOrder() ); + final VariableOrder leftVarOrder = varOrders.getLeftVarOrder(); setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId); // Create or update the right child's variable order and parent node id. - final VariableOrder rightVarOrder = new VariableOrder( varOrders.getRightVarOrder() ); + final VariableOrder rightVarOrder = varOrders.getRightVarOrder(); setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId); - - // Walk to the next node. - super.meet(node); } @Override @@ -282,7 +299,7 @@ public class SparqlFluoQueryBuilder { // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr)child); - final VariableOrder childVarOrder = new VariableOrder( toVarOrderString(childVars) ); + final VariableOrder childVarOrder = new VariableOrder(childVars); setChildMetadata(childNodeId, childVarOrder, filterId); // Walk to the next node. @@ -293,7 +310,7 @@ public class SparqlFluoQueryBuilder { public void meet(final Projection node) { // Create a builder for this node populated with the metadata. final String queryId = nodeIds.getOrMakeId(node); - final VariableOrder queryVarOrder = new VariableOrder( toVarOrderString( node.getAssuredBindingNames() ) ); + final VariableOrder queryVarOrder = new VariableOrder(node.getBindingNames()); final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); fluoQueryBuilder.setQueryMetadata(queryBuilder); @@ -311,7 +328,7 @@ public class SparqlFluoQueryBuilder { // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr)child); - final VariableOrder childVarOrder = new VariableOrder( toVarOrderString(childVars) ); + final VariableOrder childVarOrder = new VariableOrder(childVars); setChildMetadata(childNodeId, childVarOrder, queryId); @@ -386,10 +403,9 @@ public class SparqlFluoQueryBuilder { final Set<String> vars = Sets.newHashSet(); - final Set<String> abn = node.getAssuredBindingNames(); - for(final String s: abn) { - if(!s.startsWith("-const-")) { - vars.add(s); + for(final String bindingName : node.getBindingNames()) { + if(!bindingName.startsWith("-const-")) { + vars.add(bindingName); } } @@ -402,8 +418,8 @@ public class SparqlFluoQueryBuilder { @Immutable @ParametersAreNonnullByDefault private static final class JoinVarOrders { - private final String leftVarOrder; - private final String rightVarOrder; + private final VariableOrder leftVarOrder; + private final VariableOrder rightVarOrder; /** * Constructs an instance of {@link }. @@ -411,7 +427,7 @@ public class SparqlFluoQueryBuilder { * @param leftVarOrder - The left child's Variable Order. (not null) * @param rightVarOrder - The right child's Variable Order. (not null) */ - public JoinVarOrders(final String leftVarOrder, final String rightVarOrder) { + public JoinVarOrders(final VariableOrder leftVarOrder, final VariableOrder rightVarOrder) { this.leftVarOrder = checkNotNull(leftVarOrder); this.rightVarOrder = checkNotNull(rightVarOrder); } @@ -419,14 +435,14 @@ public class SparqlFluoQueryBuilder { /** * @return The left child's Variable Order. */ - public String getLeftVarOrder() { + public VariableOrder getLeftVarOrder() { return leftVarOrder; } /** * @return The right child's Variable Order. */ - public String getRightVarOrder() { + public VariableOrder getRightVarOrder() { return rightVarOrder; } } @@ -450,7 +466,7 @@ public class SparqlFluoQueryBuilder { // Push all of the common variables to the left for each child's vars. final List<String> leftVarOrder = leftShiftCommonVars(commonVars, leftVars); final List<String> rightVarOrder = leftShiftCommonVars(commonVars, rightVars); - return new JoinVarOrders(toVarOrderString(leftVarOrder), toVarOrderString(rightVarOrder)); + return new JoinVarOrders(new VariableOrder(leftVarOrder), new VariableOrder(rightVarOrder)); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java index a672bd2..4ad5189 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java @@ -19,94 +19,20 @@ package org.apache.rya.indexing.pcj.fluo.app; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.Collection; import org.junit.Test; import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.impl.URIImpl; import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.Var; -import org.openrdf.query.impl.MapBindingSet; - -import com.beust.jcommander.internal.Lists; /** * Tests the methods of {@link FluoStringConverterTest}. */ public class FluoStringConverterTest { - @Test - public void varOrderToString() { - // Setup the variable order that will be converted. - final Collection<String> varOrder = Lists.newArrayList("x", "y", "z"); - - // Convert it to a String. - final String varOrderString = FluoStringConverter.toVarOrderString(varOrder); - - // Ensure it converted to the expected result. - final String expected = "x;y;z"; - assertEquals(expected, varOrderString); - } - - @Test - public void stringToVarOrder() { - // Setup the String that will be converted. - final String varOrderString = "x;y;z"; - - // Convert it to an array in variable order. - final String[] varOrder = FluoStringConverter.toVarOrder(varOrderString); - - // Ensure it converted to the expected result. - final String[] expected = {"x", "y", "z"}; - assertTrue( Arrays.equals(expected, varOrder) ); - } - - @Test - public void bindingSetToString() { - // Setup the binding set that will be converted. - final MapBindingSet originalBindingSet = new MapBindingSet(); - originalBindingSet.addBinding("x", new URIImpl("http://a")); - originalBindingSet.addBinding("y", new URIImpl("http://b")); - originalBindingSet.addBinding("z", new URIImpl("http://c")); - - // Convert it to a String. - final String[] varOrder = new String[] {"y", "z", "x" }; - final String bindingSetString = FluoStringConverter.toBindingSetString(originalBindingSet, varOrder); - - // Ensure it converted to the expected result. - final String expected = "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + - "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + - "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI"; - - assertEquals(expected, bindingSetString); - } - - @Test - public void stringToBindingSet() { - // Setup the String that will be converted. - final String bindingSetString = "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + - "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + - "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI"; - - // Convert it to a BindingSet - final String[] varOrder = new String[] {"y", "z", "x" }; - final BindingSet bindingSet = FluoStringConverter.toBindingSet(bindingSetString, varOrder); - - // Ensure it converted to the expected result. - final MapBindingSet expected = new MapBindingSet(); - expected.addBinding("x", new URIImpl("http://a")); - expected.addBinding("y", new URIImpl("http://b")); - expected.addBinding("z", new URIImpl("http://c")); - - assertEquals(expected, bindingSet); - } - @Test public void statementPatternToString() throws MalformedQueryException { // Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>." http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java new file mode 100644 index 0000000..025c3e7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Tests the methods of {@link LeftOuterJoin}. + */ +public class LeftOuterJoinTest { + + private final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void newLeftResult_noRightMatches() { + IterativeJoin leftOuterJoin = new LeftOuterJoin(); + + // There is a new left result. + MapBindingSet newLeftResult = new MapBindingSet(); + newLeftResult.addBinding("name", vf.createLiteral("Bob")); + + // There are no right results that join with the left result. + Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator(); + + // Therefore, the left result is a new join result. + Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); + + Set<BindingSet> newJoinResults = new HashSet<>(); + while(newJoinResultsIt.hasNext()) { + newJoinResults.add( newJoinResultsIt.next() ); + } + + Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult ); + + assertEquals(expected, newJoinResults); + } + + @Test + public void newLeftResult_joinsWithRightResults() { + IterativeJoin leftOuterJoin = new LeftOuterJoin(); + + // There is a new left result. + MapBindingSet newLeftResult = new MapBindingSet(); + newLeftResult.addBinding("name", vf.createLiteral("Bob")); + newLeftResult.addBinding("height", vf.createLiteral("5'9\"")); + + // There are a few right results that join with the left result. + MapBindingSet nameAge = new MapBindingSet(); + nameAge.addBinding("name", vf.createLiteral("Bob")); + nameAge.addBinding("age", vf.createLiteral(56)); + + MapBindingSet nameHair = new MapBindingSet(); + nameHair.addBinding("name", vf.createLiteral("Bob")); + nameHair.addBinding("hairColor", vf.createLiteral("Brown")); + + Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + + // Therefore, there are a few new join results that mix the two together. + Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults); + + Set<BindingSet> newJoinResults = new HashSet<>(); + while(newJoinResultsIt.hasNext()) { + newJoinResults.add( newJoinResultsIt.next() ); + } + + Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + MapBindingSet nameHeightAge = new MapBindingSet(); + nameHeightAge.addBinding("name", vf.createLiteral("Bob")); + nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightAge.addBinding("age", vf.createLiteral(56)); + expected.add(nameHeightAge); + + MapBindingSet nameHeightHair = new MapBindingSet(); + nameHeightHair.addBinding("name", vf.createLiteral("Bob")); + nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); + expected.add(nameHeightHair); + + assertEquals(expected, newJoinResults); + } + + @Test + public void newRightResult_noLeftMatches() { + IterativeJoin leftOuterJoin = new LeftOuterJoin(); + + // There are no left results. + Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator(); + + // There is a new right result. + MapBindingSet newRightResult = new MapBindingSet(); + newRightResult.addBinding("name", vf.createLiteral("Bob")); + + // Therefore, there are no new join results. + Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult); + assertFalse( newJoinResultsIt.hasNext() ); + } + + @Test + public void newRightResult_joinsWithLeftResults() { + IterativeJoin leftOuterJoin = new LeftOuterJoin(); + + // There are a few left results that join with the new right result. + MapBindingSet nameAge = new MapBindingSet(); + nameAge.addBinding("name", vf.createLiteral("Bob")); + nameAge.addBinding("age", vf.createLiteral(56)); + + MapBindingSet nameHair = new MapBindingSet(); + nameHair.addBinding("name", vf.createLiteral("Bob")); + nameHair.addBinding("hairColor", vf.createLiteral("Brown")); + + Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator(); + + // There is a new right result. + MapBindingSet newRightResult = new MapBindingSet(); + newRightResult.addBinding("name", vf.createLiteral("Bob")); + newRightResult.addBinding("height", vf.createLiteral("5'9\"")); + + // Therefore, there are a few new join results that mix the two together. + Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult); + + Set<BindingSet> newJoinResults = new HashSet<>(); + while(newJoinResultsIt.hasNext()) { + newJoinResults.add( newJoinResultsIt.next() ); + } + + Set<BindingSet> expected = Sets.<BindingSet>newHashSet(); + MapBindingSet nameHeightAge = new MapBindingSet(); + nameHeightAge.addBinding("name", vf.createLiteral("Bob")); + nameHeightAge.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightAge.addBinding("age", vf.createLiteral(56)); + expected.add(nameHeightAge); + + MapBindingSet nameHeightHair = new MapBindingSet(); + nameHeightHair.addBinding("name", vf.createLiteral("Bob")); + nameHeightHair.addBinding("height", vf.createLiteral("5'9\"")); + nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown")); + expected.add(nameHeightHair); + + assertEquals(expected, newJoinResults); + } +} \ No newline at end of file
