http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
index 859cd4b..85f1b1f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -19,7 +19,6 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
 import javax.annotation.ParametersAreNonnullByDefault;
@@ -35,17 +34,17 @@ import io.fluo.api.data.Bytes;
 @ParametersAreNonnullByDefault
 public class BindingSetRow {
     private final String nodeId;
-    private final String[] bindingStrings;
+    private final String bindingSetString;
 
     /**
      * Constructs an instance of {@link BindingSetRow}.
      *
      * @param nodeId - The Node ID of a query node. (not null)
-     * @param bindingStrings - A Binding Set that is part of the node's 
results. (not null)
+     * @param bindingSetString - A Binding Set that is part of the node's 
results. (not null)
      */
-    public BindingSetRow(final String nodeId, final String[] bindingStrings) {
+    public BindingSetRow(final String nodeId, final String bindingSetString) {
         this.nodeId = checkNotNull(nodeId);
-        this.bindingStrings = checkNotNull(bindingStrings);
+        this.bindingSetString = checkNotNull(bindingSetString);
     }
 
     /**
@@ -56,11 +55,10 @@ public class BindingSetRow {
     }
 
     /**
-     * @return A Binding Set that is part of the node's results. It is 
formatted
-     *   in SPO order and each String requires further interpretation.
+     * @return A Binding Set that is part of the node's results.
      */
-    public String[] getBindingStrings() {
-        return bindingStrings;
+    public String getBindingSetString() {
+        return bindingSetString;
     }
 
     /**
@@ -77,11 +75,10 @@ public class BindingSetRow {
         if(rowArray.length != 2) {
             throw new IllegalArgumentException("A row must contain a single 
NODEID_BS_DELIM.");
         }
-        final String nodeId = rowArray[0];
 
-        // Read the row's Binding Set from the bytes.
-        final String[] bindingStrings = rowArray[1].split(DELIM);
+        final String nodeId = rowArray[0];
+        String bindingSetString = rowArray[1];
 
-        return new BindingSetRow(nodeId, bindingStrings);
+        return new BindingSetRow(nodeId, bindingSetString);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 5ff5acc..3af079f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -62,6 +62,8 @@ public class FilterResultUpdater {
 
     private final Encoder encoder = new StringEncoder();
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     /**
      * A utility class used to search SPARQL queries for Filters.
      */
@@ -119,10 +121,12 @@ public class FilterResultUpdater {
 
             final MapBindingSet filterBindingSet = new MapBindingSet();
             for(final String bindingName : filterVarOrder) {
-                final Binding binding = 
childBindingSet.getBinding(bindingName);
-                filterBindingSet.addBinding(binding);
+                if(childBindingSet.hasBinding(bindingName)) {
+                    final Binding binding = 
childBindingSet.getBinding(bindingName);
+                    filterBindingSet.addBinding(binding);
+                }
             }
-            final String filterBindingSetString = 
BindingSetStringConverter.toString(filterBindingSet, filterVarOrder);
+            final String filterBindingSetString = 
converter.convert(filterBindingSet, filterVarOrder);
 
             final Bytes row = encoder.encode( filterMetadata.getNodeId() + 
NODEID_BS_DELIM + filterBindingSetString );
             final Column col = FluoQueryColumns.FILTER_BINDING_SET;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 61e3d5f..aa7c959 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -18,32 +18,20 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
-
-import java.util.Collection;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.openrdf.model.Literal;
 import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.base.Joiner;
 
 import mvm.rya.api.domain.RyaType;
 import mvm.rya.api.resolver.RdfToRyaConversions;
@@ -55,117 +43,6 @@ import mvm.rya.api.resolver.RdfToRyaConversions;
 @ParametersAreNonnullByDefault
 public class FluoStringConverter {
 
-    private static final ValueFactory valueFactory = new ValueFactoryImpl();
-
-    /**
-     * Converts an ordered collection of variables into the Variable Order
-     * String that is stored in the {@link 
IncrementalUpdateConstants#NODE_VARS}
-     * column of the Fluo application.
-     *
-     * @param varOrder - An ordered collection of variables. (not null)
-     * @return The string representation of the variable order.
-     */
-    public static String toVarOrderString(final Collection<String> varOrder) {
-        checkNotNull(varOrder);
-        return Joiner.on(VAR_DELIM).join(varOrder);
-    }
-
-    /**
-     * Converts an ordered array of variables into the Variable Order
-     * String that is stored in the {@link 
IncrementalUpdateConstants#NODE_VARS}
-     * column of the Fluo application.
-     *
-     * @param varOrder - An ordered array of variables. (not null)
-     * @return The string representation of the variable order.
-     */
-    public static String toVarOrderString(final String... varOrder) {
-        return Joiner.on(VAR_DELIM).join(varOrder);
-    }
-
-    /**
-     * Converts a String into an array holding the Variable Order of a Binding 
Set.
-     *
-     * @param varOrderString - The string representation of the variable 
order. (not null)
-     * @return An ordered array holding the variable order of a binding set.
-     */
-    public static String[] toVarOrder(final String varOrderString) {
-        checkNotNull(varOrderString);
-        return varOrderString.split(VAR_DELIM);
-    }
-
-    /**
-     * Converts a {@link BindingSet} to the String representation that the Fluo
-     * application serializes to the Binding Set columns.
-     *
-     * @param bindingSet - The binding set values. (not null)
-     * @param varOrder - The order the variables must appear in. (not null)
-     * @return A {@code String} version of {@code bindingSet} suitable for
-     *   serialization to one of the Fluo application's binding set columns.
-     */
-    public static String toBindingSetString(final BindingSet bindingSet, final 
String[] varOrder) {
-        checkNotNull(bindingSet);
-        checkNotNull(varOrder);
-
-        final StringBuilder bindingSetString = new StringBuilder();
-
-        for(int i = 0; i < varOrder.length; i++) {
-            // Add a value to the binding set.
-            final String varName = varOrder[i];
-            final Value value = bindingSet.getBinding(varName).getValue();
-            final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
-            bindingSetString.append( ryaValue.getData() 
).append(TYPE_DELIM).append( ryaValue.getDataType() );
-
-            // If there are more values to add, include a delimiter between 
them.
-            if(i != varOrder.length-1) {
-                bindingSetString.append(DELIM);
-            }
-        }
-
-        return bindingSetString.toString();
-    }
-
-    /**
-     * Converts the String representation of a {@link BindingSet} as is created
-     * by {@link #toBindingSetString(BindingSet, String[])} back into a
-     * BindingSet.
-     *
-     * @param bindingSetString - The binding set values as a String. (not null)
-     * @param varOrder - The order the variables appear in the String version 
of
-     *   the BindingSet. (not null)
-     * @return A {@link BindingSet} representation of the String.
-     */
-    public static BindingSet toBindingSet(final String bindingSetString, final 
String[] varOrder) {
-        checkNotNull(bindingSetString);
-        checkNotNull(varOrder);
-
-        final String[] bindingStrings = toBindingStrings(bindingSetString);
-        return toBindingSet(bindingStrings, varOrder);
-    }
-
-    /**
-     * Creates a {@link BindingSet} from an ordered array of Strings that 
represent
-     * {@link Binding}s and their variable names.
-     *
-     * @param bindingStrings - An ordered array of Strings representing {@link 
Binding}s. (not null)
-     * @param varOrder - An ordered array of variable names for the binding 
strings. (not null)
-     * @return The parameters converted into a {@link BindingSet}.
-     */
-    public static BindingSet toBindingSet(final String[] bindingStrings, final 
String[] varOrder) {
-        checkNotNull(varOrder);
-        checkNotNull(bindingStrings);
-        checkArgument(varOrder.length == bindingStrings.length);
-
-        final QueryBindingSet bindingSet = new QueryBindingSet();
-
-        for(int i = 0; i < bindingStrings.length; i++) {
-            final String name = varOrder[i];
-            final Value value = FluoStringConverter.toValue(bindingStrings[i]);
-            bindingSet.addBinding(name, value);
-        }
-
-        return bindingSet;
-    }
-
     /**
      * Extract the {@link Binding} strings from a {@link BindingSet}'s string 
form.
      *
@@ -178,35 +55,6 @@ public class FluoStringConverter {
     }
 
     /**
-     * Creates a {@link Value} from a String representation of it.
-     *
-     * @param valueString - The String representation of the value. (not null)
-     * @return The {@link Value} representation of the String.
-     */
-    public static Value toValue(final String valueString) {
-        checkNotNull(valueString);
-
-        // Split the String that was stored in Fluo into its Value and Type 
parts.
-        final String[] valueAndType = valueString.split(TYPE_DELIM);
-        if(valueAndType.length != 2) {
-            throw new IllegalArgumentException("Array must contain data and 
type info!");
-        }
-
-        final String dataString = valueAndType[0];
-        final String typeString = valueAndType[1];
-
-        // Convert the String Type into a URI that describes the type.
-        final URI typeURI = valueFactory.createURI(typeString);
-
-        // Convert the String Value into a Value.
-        final Value value = typeURI.equals(XMLSchema.ANYURI) ?
-                valueFactory.createURI(dataString) :
-                valueFactory.createLiteral(dataString, new 
URIImpl(typeString));
-
-        return value;
-    }
-
-    /**
      * Converts the String representation of a {@link StatementPattern} back
      * into the object version.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index a25cb92..bc558a5 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -25,18 +25,21 @@ import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import io.fluo.api.client.TransactionBase;
 import io.fluo.api.config.ScannerConfiguration;
@@ -47,6 +50,9 @@ import io.fluo.api.iterator.ColumnIterator;
 import io.fluo.api.iterator.RowIterator;
 import io.fluo.api.types.Encoder;
 import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter;
+import 
mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 
 /**
@@ -55,7 +61,8 @@ import 
mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 @ParametersAreNonnullByDefault
 public class JoinResultUpdater {
-    private static final Logger log = 
Logger.getLogger(JoinResultUpdater.class);
+
+    private static final BindingSetConverter<String> converter = new 
BindingSetStringConverter();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
     private final Encoder encoder = new StringEncoder();
@@ -67,93 +74,112 @@ public class JoinResultUpdater {
      * @param tx - The transaction all Fluo queries will use. (not null)
      * @param childId - The Node ID of the child whose results received a new 
Binding Set. (not null)
      * @param childBindingSet - The Binding Set that was just emitted by child 
node. (not null)
-     * @param joinMetadata - The metadatat for the Join that has been 
notified. (not null)
+     * @param joinMetadata - The metadata for the Join that has been notified. 
(not null)
+     * @throws BindingSetConversionException
      */
     public void updateJoinResults(
             final TransactionBase tx,
             final String childId,
             final BindingSet childBindingSet,
-            final JoinMetadata joinMetadata) {
+            final JoinMetadata joinMetadata) throws 
BindingSetConversionException {
         checkNotNull(tx);
         checkNotNull(childId);
+        checkNotNull(childBindingSet);
         checkNotNull(joinMetadata);
 
-        // Read the Join metadata from the Fluo table.
-        final String[] joinVarOrder = 
joinMetadata.getVariableOrder().toArray();
+        // Figure out which join algorithm we are going to use.
+        final IterativeJoin joinAlgorithm;
+        switch(joinMetadata.getJoinType()) {
+            case NATURAL_JOIN:
+                joinAlgorithm = new NaturalJoin();
+                break;
+            case LEFT_OUTER_JOIN:
+                joinAlgorithm = new LeftOuterJoin();
+                break;
+            default:
+                throw new RuntimeException("Unsupported JoinType: " + 
joinMetadata.getJoinType());
+        }
+
+        // Figure out which side of the join the new binding set appeared on.
+        final Side emittingSide;
+        final String siblingId;
 
-        // Transform the Child binding set and varaible order values to be 
easier to work with.
-        final VariableOrder childVarOrder = getVarOrder(tx, childId);
-        final String[] childVarOrderArray = childVarOrder.toArray();
-        final String childBindingSetString = 
FluoStringConverter.toBindingSetString(childBindingSet, 
childVarOrder.toArray());
-        final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingSetString);
+        if(childId.equals(joinMetadata.getLeftChildNodeId())) {
+            emittingSide = Side.LEFT;
+            siblingId = joinMetadata.getRightChildNodeId();
+        } else {
+            emittingSide = Side.RIGHT;
+            siblingId = joinMetadata.getLeftChildNodeId();
+        }
 
-        // Transform the Sibling binding set and varaible order values to be 
easier to work with.
-        final String leftChildId = joinMetadata.getLeftChildNodeId();
-        final String rightChildId = joinMetadata.getRightChildNodeId();
-        final String siblingId = leftChildId.equals(childId) ? rightChildId : 
leftChildId;
+        // Iterates over the sibling node's BindingSets that join with the new 
binding set.
+        FluoTableIterator siblingBindingSets = 
makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
 
-        final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
-        final String[] siblingVarOrderArray = siblingVarOrder.toArray();
-
-        // Create a map that will be used later in this algorithm to create 
new Join result
-        // Binding Sets. It is initialized with all of the values that are in 
childBindingSet.
-        // The common values and any that are added on by the sibling will be 
overwritten
-        // for each sibling scan result.
-        final List<String> commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
-        final Map<String, String> joinBindingSet = Maps.newHashMap();
-        for(int i = 0; i < childVarOrderArray.length; i++) {
-            joinBindingSet.put(childVarOrderArray[i], childBindingStrings[i]);
+        // Iterates over the resulting BindingSets from the join.
+        final Iterator<BindingSet> newJoinResults;
+        if(emittingSide == Side.LEFT) {
+            newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, 
siblingBindingSets);
+        } else {
+            newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, 
childBindingSet);
         }
 
+        // Insert the new join binding sets to the Fluo table.
+        VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+        while(newJoinResults.hasNext()) {
+            BindingSet newJoinResult = newJoinResults.next();
+            String joinBindingSetString = converter.convert(newJoinResult, 
joinVarOrder);
+
+            final Bytes row = encoder.encode(joinMetadata.getNodeId() + 
NODEID_BS_DELIM + joinBindingSetString);
+            final Column col = FluoQueryColumns.JOIN_BINDING_SET;
+            final Bytes value = encoder.encode(joinBindingSetString);
+            tx.set(row, col, value);
+        }
+    }
+
+    /**
+     * The different sides a new binding set may appear on.
+     */
+    public static enum Side {
+        LEFT, RIGHT;
+    }
+
+    private FluoTableIterator makeSiblingScanIterator(String childId, 
BindingSet childBindingSet, String siblingId, TransactionBase tx) throws 
BindingSetConversionException {
+        // Get the common variable orders. These are used to build the prefix.
+        final VariableOrder childVarOrder = getVarOrder(tx, childId);
+        final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
+        List<String> commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
+
+        // Get the Binding strings
+        final String childBindingSetString = 
converter.convert(childBindingSet, childVarOrder);
+        final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingSetString);
+
         // Create the prefix that will be used to scan for binding sets of the 
sibling node.
         // This prefix includes the sibling Node ID and the common variable 
values from
         // childBindingSet.
-        String bsPrefix = "";
+        String siblingScanPrefix = "";
         for(int i = 0; i < commonVars.size(); i++) {
-            if(bsPrefix.length() == 0) {
-                bsPrefix = childBindingStrings[i];
+            if(siblingScanPrefix.length() == 0) {
+                siblingScanPrefix = childBindingStrings[i];
             } else {
-                bsPrefix += DELIM + childBindingStrings[i];
+                siblingScanPrefix += DELIM + childBindingStrings[i];
             }
         }
-        bsPrefix = siblingId + NODEID_BS_DELIM + bsPrefix;
+        siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
 
         // Scan the sibling node's binding sets for those that have the same
         // common variable values as childBindingSet. These needs to be joined
         // and inserted into the Join's results. It's possible that none of 
these
         // results will be new Join results if they have already been created 
in
         // earlier iterations of this algorithm.
-        final ScannerConfiguration sc1 = new ScannerConfiguration();
-        sc1.setSpan(Span.prefix(bsPrefix));
-        setScanColumnFamily(sc1, siblingId);
-
-        try {
-            final RowIterator ri = tx.get(sc1);
-            while(ri.hasNext()) {
-                final ColumnIterator ci = ri.next().getValue();
-                while(ci.hasNext()){
-                    // Get a sibling binding set.
-                    final String siblingBindingSetString = 
ci.next().getValue().toString();
-                    final String[] siblingBindingStrings = 
FluoStringConverter.toBindingStrings(siblingBindingSetString);
-
-                    // Overwrite the previous sibling's values to create a new 
join binding set.
-                    for (int i = 0; i < siblingBindingStrings.length; i++) {
-                        joinBindingSet.put(siblingVarOrderArray[i], 
siblingBindingStrings[i]);
-                    }
-                    final String joinBindingSetString = 
makeBindingSetString(joinVarOrder, joinBindingSet);
+        final ScannerConfiguration scanConfig = new ScannerConfiguration();
+        scanConfig.setSpan(Span.prefix(siblingScanPrefix));
+        setScanColumnFamily(scanConfig, siblingId);
 
-                    // Write the join binding set to Fluo.
-                    final Bytes row = encoder.encode(joinMetadata.getNodeId() 
+ NODEID_BS_DELIM + joinBindingSetString);
-                    final Column col = FluoQueryColumns.JOIN_BINDING_SET;
-                    final Bytes value = encoder.encode(joinBindingSetString);
-                    tx.set(row, col, value);
-                }
-            }
-        } catch (final Exception e) {
-            log.error("Error while scanning sibling binding sets to create new 
join results.", e);
-        }
+        final RowIterator ri = tx.get(scanConfig);
+        return new FluoTableIterator(ri, siblingVarOrder);
     }
 
+
     /**
      * Fetch the {@link VariableOrder} of a query node.
      *
@@ -216,40 +242,6 @@ public class JoinResultUpdater {
         return commonVars;
     }
 
-//    /**
-//     * Assuming that the common variables between two children are already
-//     * shifted to the left, find the common variables between them.
-//     * <p>
-//     * Refer to {@link FluoQueryInitializer} to see why this assumption is 
being made.
-//     *
-//     * @param vars1 - The first child's variable order. (not null)
-//     * @param vars2 - The second child's variable order. (not null)
-//     * @return An ordered List of the common variables between the two 
children.
-//     */
-//    private List<String> getCommonVars(final String[] vars1, final String[] 
vars2) {
-//        checkNotNull(vars1);
-//        checkNotNull(vars2);
-//
-//        final List<String> commonVars = new ArrayList<>();
-//
-//        // Only need to iteratre through the shorted order's length.
-//        final int shortestLen = Math.min(vars1.length, vars2.length);
-//        for(int i = 0; i < shortestLen; i++) {
-//            final String var1 = vars1[i];
-//            final String var2 = vars2[i];
-//
-//            if(var1.equals(var2)) {
-//                commonVars.add(var1);
-//            } else {
-//                // Because the common variables are left shifted, we can 
break once
-//                // we encounter a pair that does not match.
-//                break;
-//            }
-//        }
-//
-//        return commonVars;
-//    }
-
     /**
      * Update a {@link ScannerConfiguration} to use the sibling node's binding
      * set column for its scan. The column that will be used is determined by 
the
@@ -281,34 +273,211 @@ public class JoinResultUpdater {
                 column = FluoQueryColumns.JOIN_BINDING_SET;
                 break;
             default:
-                throw new IllegalArgumentException("The child node's sibling 
is not of type StatementPattern, Join, or Filter.");
+                throw new IllegalArgumentException("The child node's sibling 
is not of type StatementPattern, Join, Left Join, or Filter.");
         }
         sc.fetchColumn(column.getFamily(), column.getQualifier());
     }
 
     /**
-     * Create a Binding Set String from a variable order and a map of bindings.
-     *
-     * @param varOrder - The resulting binding set's variable order. (not null)
-     * @param bindingSetValues - A map holding the variables and their values 
that will be
-     *   included in the resulting binding set.
-     * @return A binding set string build from the map using the prescribed 
variable order.
+     * Defines each of the cases that may generate new join results when
+     * iteratively computing a query's join node.
      */
-    private static String makeBindingSetString(final String[] varOrder, final 
Map<String, String> bindingSetValues) {
-        checkNotNull(varOrder);
-        checkNotNull(bindingSetValues);
+    public static interface IterativeJoin {
+
+        /**
+         * Invoked when a new {@link BindingSet} is emitted from the left child
+         * node of the join. The Fluo table is scanned for results on the right
+         * side that will be joined with the new result.
+         *
+         * @param newLeftResult - A new BindingSet that has been emitted from
+         *   the left child node.
+         * @param rightResults - The right child node's binding sets that will
+         *   be joined with the new left result. (not null)
+         * @return The new BindingSet results for the join.
+         */
+        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, 
Iterator<BindingSet> rightResults);
+
+        /**
+         * Invoked when a new {@link BindingSet} is emitted from the right 
child
+         * node of the join. The Fluo table is scanned for results on the left
+         * side that will be joined with the new result.
+         *
+         * @param leftResults - The left child node's binding sets that will be
+         *   joined with the new right result.
+         * @param newRightResult - A new BindingSet that has been emitted from
+         *   the right child node.
+         * @return The new BindingSet results for the join.
+         */
+        public Iterator<BindingSet> newRightResult(Iterator<BindingSet> 
leftResults, BindingSet newRightResult);
+    }
 
-        String bindingSetString = "";
+    /**
+     * Implements an {@link IterativeJoin} that uses the Natural Join algorithm
+     * defined by Relational Algebra.
+     * <p>
+     * This is how you combine {@code BindnigSet}s that may have common Binding
+     * names. When two Binding Sets are joined, any bindings that appear in 
both
+     * binding sets are only included once.
+     */
+    public static final class NaturalJoin implements IterativeJoin {
+        @Override
+        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, 
Iterator<BindingSet> rightResults) {
+            checkNotNull(newLeftResult);
+            checkNotNull(rightResults);
+
+            // Both sides are required, so if there are no right results, then 
do not emit anything.
+            return new LazyJoiningIterator(newLeftResult, rightResults);
+        }
 
-        for (final String joinVar : varOrder) {
-            if (bindingSetString.length() == 0) {
-                bindingSetString = bindingSetValues.get(joinVar);
-            } else {
-                bindingSetString = bindingSetString + DELIM + 
bindingSetValues.get(joinVar);
+        @Override
+        public Iterator<BindingSet> newRightResult(Iterator<BindingSet> 
leftResults, BindingSet newRightResult) {
+            checkNotNull(leftResults);
+            checkNotNull(newRightResult);
+
+            // Both sides are required, so if there are no left reuslts, then 
do not emit anything.
+            return new LazyJoiningIterator(newRightResult, leftResults);
+        }
+    }
+
+    /**
+     * Implements an {@link IterativeJoin} that uses the Left Outer Join
+     * algorithm defined by Relational Algebra.
+     * <p>
+     * This is how you add optional information to a {@link BindingSet}. Left
+     * binding sets are emitted even if they do not join with anything on the 
right.
+     * However, right binding sets must be joined with a left binding set.
+     */
+    public static final class LeftOuterJoin implements IterativeJoin {
+        @Override
+        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, 
Iterator<BindingSet> rightResults) {
+            checkNotNull(newLeftResult);
+            checkNotNull(rightResults);
+
+            // If the required portion does not join with any optional 
portions,
+            // then emit a BindingSet that matches the new left result.
+            if(!rightResults.hasNext()) {
+                return 
Lists.<BindingSet>newArrayList(newLeftResult).iterator();
+            }
+
+            // Otherwise, return an iterator that holds the new required result
+            // joined with the right results.
+            return new LazyJoiningIterator(newLeftResult, rightResults);
+        }
+
+        @Override
+        public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> 
leftResults, final BindingSet newRightResult) {
+            checkNotNull(leftResults);
+            checkNotNull(newRightResult);
+
+            // The right result is optional, so if it does not join with 
anything
+            // on the left, then do not emit anything.
+            return new LazyJoiningIterator(newRightResult, leftResults);
+        }
+    }
+
+    /**
+     * Joins a {@link BindingSet} (which is new to the left or right side of a 
join)
+     * to all binding sets on the other side that join with it.
+     * <p>
+     * This is done lazily so that you don't have to load all of the 
BindingSets
+     * into memory at once.
+     */
+    private static final class LazyJoiningIterator implements 
Iterator<BindingSet> {
+
+        private final BindingSet newResult;
+        private final Iterator<BindingSet> joinedResults;
+
+        /**
+         * Constructs an instance of {@link LazyJoiningIterator}.
+         *
+         * @param newResult - A binding set that will be joined with some 
other binding sets. (not null)
+         * @param joinResults - The binding sets that will be joined with 
{@code newResult}. (not null)
+         */
+        public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> 
joinResults) {
+            this.newResult = checkNotNull(newResult);
+            this.joinedResults = checkNotNull(joinResults);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return joinedResults.hasNext();
+        }
+
+        @Override
+        public BindingSet next() {
+            final MapBindingSet bs = new MapBindingSet();
+
+            for(Binding binding : newResult) {
+                bs.addBinding(binding);
+            }
+
+            for(Binding binding : joinedResults.next()) {
+                bs.addBinding(binding);
             }
+
+            return bs;
         }
 
-        return bindingSetString;
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove() is 
unsupported.");
+        }
     }
-}
 
+    /**
+     * Iterates over rows that have a Binding Set column and returns the 
unmarshalled
+     * {@link BindingSet}s.
+     */
+    private static final class FluoTableIterator implements 
Iterator<BindingSet> {
+
+        private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
+                FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
+                FluoQueryColumns.JOIN_BINDING_SET,
+                FluoQueryColumns.FILTER_BINDING_SET);
+
+        private final RowIterator rows;
+        private final VariableOrder varOrder;
+
+        /**
+         * Constructs an instance of {@link FluoTableIterator}.
+         *
+         * @param rows - Iterates over RowId values in a Fluo Table. (not null)
+         * @param varOrder - The Variable Order of binding sets that will be
+         *   read from the Fluo Table. (not null)
+         */
+        public FluoTableIterator(RowIterator rows, VariableOrder varOrder) {
+            this.rows = checkNotNull(rows);
+            this.varOrder = checkNotNull(varOrder);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return rows.hasNext();
+        }
+
+        @Override
+        public BindingSet next() {
+            final ColumnIterator columns = rows.next().getValue();
+
+            while(columns.hasNext()) {
+                // If this is one of the BindingSet columns, handle it and 
return the BindingSet.
+                final Entry<Column, Bytes> entry = columns.next();
+                if(BINDING_SET_COLUMNS.contains(entry.getKey())) {
+                    final String bindingSetString = 
entry.getValue().toString();
+                    try {
+                        return converter.convert(bindingSetString, varOrder);
+                    } catch (BindingSetConversionException e) {
+                        throw new RuntimeException("Could not convert one of 
the stored BindingSets from a String: " + bindingSetString, e);
+                    }
+                }
+            }
+
+            throw new RuntimeException("Row did not containing a Binding 
Set.");
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove() is 
unsupported.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index e4d0155..f3ff089 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -46,6 +46,8 @@ public class QueryResultUpdater {
 
     private final Encoder encoder = new StringEncoder();
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     /**
      * Updates the results of a Query node when one of its children has added a
      * new Binding Set to its results.
@@ -67,10 +69,12 @@ public class QueryResultUpdater {
 
         final MapBindingSet queryBindingSet = new MapBindingSet();
         for(final String bindingName : queryVarOrder) {
-            final Binding binding = childBindingSet.getBinding(bindingName);
-            queryBindingSet.addBinding(binding);
+            if(childBindingSet.hasBinding(bindingName)) {
+                final Binding binding = 
childBindingSet.getBinding(bindingName);
+                queryBindingSet.addBinding(binding);
+            }
         }
-        final String queryBindingSetString = 
BindingSetStringConverter.toString(queryBindingSet, queryVarOrder);
+        final String queryBindingSetString = 
converter.convert(queryBindingSet, queryVarOrder);
 
         // Commit it to the Fluo table for the SPARQL query. This isn't 
guaranteed to be a new entry.
         final Bytes row = encoder.encode(queryMetadata.getNodeId() + 
NODEID_BS_DELIM + queryBindingSetString);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index d3c5bb5..aa944e4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -38,6 +38,7 @@ import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.types.TypedObserver;
 import io.fluo.api.types.TypedTransactionBase;
+import 
mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 
 /**
  * Notified when the results of a node have been updated to include a new 
Binding
@@ -97,7 +98,11 @@ public abstract class BindingSetUpdater extends 
TypedObserver {
 
             case JOIN:
                 final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, 
parentNodeId);
-                joinUpdater.updateJoinResults(tx, observedNodeId, 
observedBindingSet, parentJoin);
+                try {
+                    joinUpdater.updateJoinResults(tx, observedNodeId, 
observedBindingSet, parentJoin);
+                } catch (BindingSetConversionException e) {
+                    throw new RuntimeException("Could not process a Join 
node.", e);
+                }
                 break;
 
             default:

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index c6b6303..2accde3 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 
 /**
  * Notified when the results of a Filter have been updated to include a new
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
  */
 public class FilterObserver extends BindingSetUpdater {
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
     @Override
@@ -53,9 +56,8 @@ public class FilterObserver extends BindingSetUpdater {
         final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, 
filterNodeId);
 
         // Read the Binding Set that was just emmitted by the Filter.
-        final String[] filterBindingStrings = parsedRow.getBindingStrings();
-        final String[] filterVarOrder = 
filterMetadata.getVariableOrder().toArray();
-        final BindingSet filterBindingSet = 
FluoStringConverter.toBindingSet(filterBindingStrings, filterVarOrder);
+        final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
+        final BindingSet filterBindingSet = 
converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = filterMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 980c7bc..43b0a4e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 
 /**
  * Notified when the results of a Join have been updated to include a new
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
  */
 public class JoinObserver extends BindingSetUpdater {
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
     @Override
@@ -52,9 +55,8 @@ public class JoinObserver extends BindingSetUpdater {
         final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, 
joinNodeId);
 
         // Read the Binding Set that was just emmitted by the Join.
-        final String[] joinBindingStrings = parsedRow.getBindingStrings();
-        final String[] joinVarOrder = 
joinMetadata.getVariableOrder().toArray();
-        final BindingSet joinBindingSet = 
FluoStringConverter.toBindingSet(joinBindingStrings, joinVarOrder);
+        final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+        final BindingSet joinBindingSet = 
converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = joinMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 8a60039..7c1a588 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
@@ -39,6 +38,8 @@ import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.types.TypedObserver;
 import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 
 /**
  * Performs incremental result exporting to the configured destinations.
@@ -48,6 +49,8 @@ public class QueryResultObserver extends TypedObserver {
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     /**
      * Builders for each type of result exporter we support.
      */
@@ -96,10 +99,10 @@ public class QueryResultObserver extends TypedObserver {
 
         // Fetch the query's Variable Order from the Fluo table.
         final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, 
queryId);
-        final String[] varOrder = queryMetadata.getVariableOrder().toArray();
+        final VariableOrder varOrder = queryMetadata.getVariableOrder();
 
         // Export the result using each of the provided exporters.
-        final BindingSet result = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+        BindingSet result = converter.convert(bindingSetString, varOrder);
         for(final IncrementalResultExporter exporter : exporters) {
             try {
                 exporter.export(tx, queryId, result);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 00f9b13..ddba9a2 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 
 /**
  * Notified when the results of a Statement Pattern have been updated to 
include
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
  */
 public class StatementPatternObserver extends BindingSetUpdater {
 
+    private final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -53,9 +56,8 @@ public class StatementPatternObserver extends 
BindingSetUpdater {
         final StatementPatternMetadata spMetadata = 
queryDao.readStatementPatternMetadata(tx, spNodeId);
 
         // Read the Binding Set that was just emmitted by the Statement 
Pattern.
-        final String[] spBindingStrings = parsedRow.getBindingStrings();
-        final String[] spVarOrder = spMetadata.getVariableOrder().toArray();
-        final BindingSet spBindingSet = 
FluoStringConverter.toBindingSet(spBindingStrings, spVarOrder);
+        final VariableOrder spVarOrder = spMetadata.getVariableOrder();
+        final BindingSet spBindingSet = 
converter.convert(parsedRow.getBindingSetString(), spVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = spMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 3d766ed..fa25456 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -54,6 +54,7 @@ import io.fluo.api.data.Column;
  *     <tr> <th>Fluo Row</td>  <th>Fluo Column</td>  <th>Fluo Value</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:nodeId</td> <td>The Node ID of 
the Join.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:variableOrder</td> <td>The 
Variable Order binding sets are emitted with.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>joinMetadata:joinType</td> <td>The Join 
algorithm that will be used when computing join results.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node 
ID this join emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node 
ID of the node that feeds this node Binding Sets.</td> </tr>
@@ -128,6 +129,7 @@ public class FluoQueryColumns {
     // Join Metadata columns.
     public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, 
"nodeId");
     public static final Column JOIN_VARIABLE_ORDER = new 
Column(JOIN_METADATA_CF, "variableOrder");
+    public static final Column JOIN_TYPE = new Column(JOIN_METADATA_CF, 
"joinType");
     public static final Column JOIN_PARENT_NODE_ID = new 
Column(JOIN_METADATA_CF, "parentNodeId");
     public static final Column JOIN_LEFT_CHILD_NODE_ID = new 
Column(JOIN_METADATA_CF, "leftChildNodeId");
     public static final Column JOIN_RIGHT_CHILD_NODE_ID = new 
Column(JOIN_METADATA_CF, "rightChildNodeId");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 0ab5f18..265ca0f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 
 import com.google.common.collect.Sets;
 
@@ -170,6 +171,7 @@ public class FluoQueryMetadataDAO {
         final Bytes rowId = encoder.encode(metadata.getNodeId());
         tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId);
         tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( 
metadata.getVariableOrder().toString() ));
+        tx.set(rowId, FluoQueryColumns.JOIN_TYPE, 
encoder.encode(metadata.getJoinType().toString()) );
         tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( 
metadata.getParentNodeId() ));
         tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, 
encoder.encode( metadata.getLeftChildNodeId() ));
         tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, 
encoder.encode( metadata.getRightChildNodeId() ));
@@ -194,6 +196,7 @@ public class FluoQueryMetadataDAO {
         final Bytes rowId = encoder.encode(nodeId);
         final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
                 FluoQueryColumns.JOIN_VARIABLE_ORDER,
+                FluoQueryColumns.JOIN_TYPE,
                 FluoQueryColumns.JOIN_PARENT_NODE_ID,
                 FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID,
                 FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID));
@@ -202,12 +205,16 @@ public class FluoQueryMetadataDAO {
         final String varOrderString = encoder.decodeString( 
values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER));
         final VariableOrder varOrder = new VariableOrder(varOrderString);
 
+        String joinTypeString = encoder.decodeString( 
values.get(FluoQueryColumns.JOIN_TYPE) );
+        JoinType joinType = JoinType.valueOf(joinTypeString);
+
         final String parentNodeId = encoder.decodeString( 
values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) );
         final String leftChildNodeId = encoder.decodeString( 
values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) );
         final String rightChildNodeId = encoder.decodeString( 
values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) );
 
         return JoinMetadata.builder(nodeId)
                 .setVariableOrder(varOrder)
+                .setJoinType(joinType)
                 .setParentNodeId(parentNodeId)
                 .setLeftChildNodeId(leftChildNodeId)
                 .setRightChildNodeId(rightChildNodeId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index 2546972..566ce61 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -37,6 +37,15 @@ import 
mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 @ParametersAreNonnullByDefault
 public class JoinMetadata extends CommonNodeMetadata {
 
+    /**
+     * The different types of Join algorithms that this join may perform.
+     */
+    public static enum JoinType {
+        NATURAL_JOIN,
+        LEFT_OUTER_JOIN;
+    }
+
+    private final JoinType joinType;
     private final String parentNodeId;
     private final String leftChildNodeId;
     private final String rightChildNodeId;
@@ -46,6 +55,7 @@ public class JoinMetadata extends CommonNodeMetadata {
      *
      * @param nodeId - The ID the Fluo app uses to reference this node. (not 
null)
      * @param varOrder - The variable order of binding sets that are emitted 
by this node. (not null)
+     * @param joinType - Defines which join algorithm the join will use.
      * @param parentNodeId - The node id of this node's parent. (not null)
      * @param leftChildNodeId - One of the nodes whose results are being 
joined. (not null)
      * @param rightChildNodeId - The other node whose results are being 
joined. (not null)
@@ -53,16 +63,25 @@ public class JoinMetadata extends CommonNodeMetadata {
     public JoinMetadata(
             final String nodeId,
             final VariableOrder varOrder,
+            final JoinType joinType,
             final String parentNodeId,
             final String leftChildNodeId,
             final String rightChildNodeId) {
         super(nodeId, varOrder);
+        this.joinType = checkNotNull(joinType);
         this.parentNodeId = checkNotNull(parentNodeId);
         this.leftChildNodeId = checkNotNull(leftChildNodeId);
         this.rightChildNodeId = checkNotNull(rightChildNodeId);
     }
 
     /**
+     * @return Defines which join algorithm the join will use.
+     */
+    public JoinType getJoinType() {
+        return joinType;
+    }
+
+    /**
      * @return The node id of this node's parent.
      */
     public String getParentNodeId() {
@@ -88,6 +107,7 @@ public class JoinMetadata extends CommonNodeMetadata {
         return Objects.hashCode(
                 super.getNodeId(),
                 super.getVariableOrder(),
+                joinType,
                 parentNodeId,
                 leftChildNodeId,
                 rightChildNodeId);
@@ -103,6 +123,7 @@ public class JoinMetadata extends CommonNodeMetadata {
             if(super.equals(o)) {
                 final JoinMetadata joinMetadata = (JoinMetadata)o;
                 return new EqualsBuilder()
+                        .append(joinType, joinMetadata.joinType)
                         .append(parentNodeId, joinMetadata.parentNodeId)
                         .append(leftChildNodeId, joinMetadata.leftChildNodeId)
                         .append(rightChildNodeId, 
joinMetadata.rightChildNodeId)
@@ -120,6 +141,7 @@ public class JoinMetadata extends CommonNodeMetadata {
                 .append("Join Metadata {\n")
                 .append("    Node ID: " + super.getNodeId() + "\n")
                 .append("    Variable Order: " + super.getVariableOrder() + 
"\n")
+                .append("    Join Type: " + joinType + "\n")
                 .append("    Parent Node ID: " + parentNodeId + "\n")
                 .append("    Left Child Node ID: " + leftChildNodeId + "\n")
                 .append("    Right Child Node ID: " + rightChildNodeId + "\n")
@@ -145,6 +167,7 @@ public class JoinMetadata extends CommonNodeMetadata {
 
         private final String nodeId;
         private VariableOrder varOrder;
+        private JoinType joinType;
         private String parentNodeId;
         private String leftChildNodeId;
         private String rightChildNodeId;
@@ -188,6 +211,17 @@ public class JoinMetadata extends CommonNodeMetadata {
         }
 
         /**
+         * Sets the type of join algorithm that will be used by this join.
+         *
+         * @param joinType - Defines which join algorithm the join will use.
+         * @return This builder so that method invocation could be chained.
+         */
+        public Builder setJoinType(@Nullable final JoinType joinType) {
+            this.joinType = joinType;
+            return this;
+        }
+
+        /**
          * Set one of the nodes whose results are being joined.
          *
          * @param leftChildNodeId - One of the nodes whose results are being 
joined.
@@ -216,6 +250,7 @@ public class JoinMetadata extends CommonNodeMetadata {
             return new JoinMetadata(
                     nodeId,
                     varOrder,
+                    joinType,
                     parentNodeId,
                     leftChildNodeId,
                     rightChildNodeId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 844a7a4..25b4e4f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter.toVarOrderString;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -39,8 +38,10 @@ import javax.annotation.concurrent.Immutable;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.openrdf.query.algebra.Filter;
 import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.LeftJoin;
 import org.openrdf.query.algebra.Projection;
 import org.openrdf.query.algebra.QueryModelNode;
 import org.openrdf.query.algebra.StatementPattern;
@@ -152,7 +153,7 @@ public class SparqlFluoQueryBuilder {
                 prefix = SP_PREFIX;
             } else if(node instanceof Filter) {
                 prefix = FILTER_PREFIX;
-            } else if(node instanceof Join) {
+            } else if(node instanceof Join || node instanceof LeftJoin) {
                 prefix = JOIN_PREFIX;
             } else if(node instanceof Projection) {
                 prefix = QUERY_PREFIX;
@@ -218,18 +219,36 @@ public class SparqlFluoQueryBuilder {
         }
 
         @Override
+        public void meet(LeftJoin node) {
+            // Extract the metadata that will be stored for the node.
+            String leftJoinNodeId = nodeIds.getOrMakeId(node);
+            final QueryModelNode left = node.getLeftArg();
+            final QueryModelNode right = node.getRightArg();
+
+            // Update the metadata for the JoinMetadata.Builder.
+            makeJoinMetadata(leftJoinNodeId, JoinType.LEFT_OUTER_JOIN, left, 
right);
+
+            // Walk to the next node.
+            super.meet(node);
+        }
+
+        @Override
         public void meet(final Join node) {
             // Extract the metadata that will be stored from the node.
             final String joinNodeId = nodeIds.getOrMakeId(node);
             final QueryModelNode left = node.getLeftArg();
             final QueryModelNode right = node.getRightArg();
 
-            if(left == null || right == null) {
-                throw new IllegalArgumentException("Join args connot be 
null.");
-            }
+            // Update the metadata for the JoinMetadata.Builder.
+            makeJoinMetadata(joinNodeId, JoinType.NATURAL_JOIN, left, right);
+
+            // Walk to the next node.
+            super.meet(node);
+        }
 
+        private void makeJoinMetadata(String joinNodeId, JoinType joinType, 
QueryModelNode left, QueryModelNode right) {
             final String leftChildNodeId = nodeIds.getOrMakeId(left);
-            final String rightChildNodeId = nodeIds.getOrMakeId( right );
+            final String rightChildNodeId = nodeIds.getOrMakeId(right);
 
             // Get or create a builder for this node populated with the known 
metadata.
             JoinMetadata.Builder joinBuilder = 
fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull();
@@ -237,6 +256,7 @@ public class SparqlFluoQueryBuilder {
                 joinBuilder = JoinMetadata.builder(joinNodeId);
                 fluoQueryBuilder.addJoinMetadata(joinBuilder);
             }
+            joinBuilder.setJoinType(joinType);
             joinBuilder.setLeftChildNodeId( leftChildNodeId );
             joinBuilder.setRightChildNodeId( rightChildNodeId );
 
@@ -247,15 +267,12 @@ public class SparqlFluoQueryBuilder {
             final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, 
rightVars);
 
             // Create or update the left child's variable order and parent 
node id.
-            final VariableOrder leftVarOrder = new VariableOrder( 
varOrders.getLeftVarOrder() );
+            final VariableOrder leftVarOrder = varOrders.getLeftVarOrder();
             setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId);
 
             // Create or update the right child's variable order and parent 
node id.
-            final VariableOrder rightVarOrder = new VariableOrder( 
varOrders.getRightVarOrder() );
+            final VariableOrder rightVarOrder = varOrders.getRightVarOrder();
             setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId);
-
-            // Walk to the next node.
-            super.meet(node);
         }
 
         @Override
@@ -282,7 +299,7 @@ public class SparqlFluoQueryBuilder {
 
             // Update the child node's metadata.
             final Set<String> childVars = getVars((TupleExpr)child);
-            final VariableOrder childVarOrder = new VariableOrder( 
toVarOrderString(childVars) );
+            final VariableOrder childVarOrder = new VariableOrder(childVars);
             setChildMetadata(childNodeId, childVarOrder, filterId);
 
             // Walk to the next node.
@@ -293,7 +310,7 @@ public class SparqlFluoQueryBuilder {
         public void meet(final Projection node) {
             // Create a builder for this node populated with the metadata.
             final String queryId = nodeIds.getOrMakeId(node);
-            final VariableOrder queryVarOrder = new VariableOrder( 
toVarOrderString( node.getAssuredBindingNames() ) );
+            final VariableOrder queryVarOrder = new 
VariableOrder(node.getBindingNames());
 
             final QueryMetadata.Builder queryBuilder = 
QueryMetadata.builder(queryId);
             fluoQueryBuilder.setQueryMetadata(queryBuilder);
@@ -311,7 +328,7 @@ public class SparqlFluoQueryBuilder {
 
             // Update the child node's metadata.
             final Set<String> childVars = getVars((TupleExpr)child);
-            final VariableOrder childVarOrder = new VariableOrder( 
toVarOrderString(childVars) );
+            final VariableOrder childVarOrder = new VariableOrder(childVars);
 
             setChildMetadata(childNodeId, childVarOrder, queryId);
 
@@ -386,10 +403,9 @@ public class SparqlFluoQueryBuilder {
 
             final Set<String> vars = Sets.newHashSet();
 
-            final Set<String> abn = node.getAssuredBindingNames();
-            for(final String s: abn) {
-                if(!s.startsWith("-const-")) {
-                    vars.add(s);
+            for(final String bindingName : node.getBindingNames()) {
+                if(!bindingName.startsWith("-const-")) {
+                    vars.add(bindingName);
                 }
             }
 
@@ -402,8 +418,8 @@ public class SparqlFluoQueryBuilder {
         @Immutable
         @ParametersAreNonnullByDefault
         private static final class JoinVarOrders {
-            private final String leftVarOrder;
-            private final String rightVarOrder;
+            private final VariableOrder leftVarOrder;
+            private final VariableOrder rightVarOrder;
 
             /**
              * Constructs an instance of {@link }.
@@ -411,7 +427,7 @@ public class SparqlFluoQueryBuilder {
              * @param leftVarOrder - The left child's Variable Order. (not 
null)
              * @param rightVarOrder - The right child's Variable Order. (not 
null)
              */
-            public JoinVarOrders(final String leftVarOrder, final String 
rightVarOrder) {
+            public JoinVarOrders(final VariableOrder leftVarOrder, final 
VariableOrder rightVarOrder) {
                 this.leftVarOrder = checkNotNull(leftVarOrder);
                 this.rightVarOrder = checkNotNull(rightVarOrder);
             }
@@ -419,14 +435,14 @@ public class SparqlFluoQueryBuilder {
             /**
              * @return The left child's Variable Order.
              */
-            public String getLeftVarOrder() {
+            public VariableOrder getLeftVarOrder() {
                 return leftVarOrder;
             }
 
             /**
              * @return The right child's Variable Order.
              */
-            public String getRightVarOrder() {
+            public VariableOrder getRightVarOrder() {
                 return rightVarOrder;
             }
         }
@@ -450,7 +466,7 @@ public class SparqlFluoQueryBuilder {
             // Push all of the common variables to the left for each child's 
vars.
             final List<String> leftVarOrder = leftShiftCommonVars(commonVars, 
leftVars);
             final List<String> rightVarOrder = leftShiftCommonVars(commonVars, 
rightVars);
-            return new JoinVarOrders(toVarOrderString(leftVarOrder), 
toVarOrderString(rightVarOrder));
+            return new JoinVarOrders(new VariableOrder(leftVarOrder), new 
VariableOrder(rightVarOrder));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
index a672bd2..4ad5189 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -19,94 +19,20 @@
 package org.apache.rya.indexing.pcj.fluo.app;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
 
 import org.junit.Test;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.Var;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.beust.jcommander.internal.Lists;
 
 /**
  * Tests the methods of {@link FluoStringConverterTest}.
  */
 public class FluoStringConverterTest {
 
-    @Test
-    public void varOrderToString() {
-        // Setup the variable order that will be converted.
-        final Collection<String> varOrder = Lists.newArrayList("x", "y", "z");
-
-        // Convert it to a String.
-        final String varOrderString = 
FluoStringConverter.toVarOrderString(varOrder);
-
-        // Ensure it converted to the expected result.
-        final String expected = "x;y;z";
-        assertEquals(expected, varOrderString);
-    }
-
-    @Test
-    public void stringToVarOrder() {
-        // Setup the String that will be converted.
-        final String varOrderString = "x;y;z";
-
-        // Convert it to an array in variable order.
-        final String[] varOrder = 
FluoStringConverter.toVarOrder(varOrderString);
-
-        // Ensure it converted to the expected result.
-        final String[] expected = {"x", "y", "z"};
-        assertTrue( Arrays.equals(expected, varOrder) );
-    }
-
-       @Test
-       public void bindingSetToString() {
-               // Setup the binding set that will be converted.
-               final MapBindingSet originalBindingSet = new MapBindingSet();
-        originalBindingSet.addBinding("x", new URIImpl("http://a";));
-        originalBindingSet.addBinding("y", new URIImpl("http://b";));
-        originalBindingSet.addBinding("z", new URIImpl("http://c";));
-
-        // Convert it to a String.
-        final String[] varOrder = new String[] {"y", "z", "x" };
-        final String bindingSetString = 
FluoStringConverter.toBindingSetString(originalBindingSet, varOrder);
-
-        // Ensure it converted to the expected result.
-        final String expected = 
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
-                "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
-                "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
-
-        assertEquals(expected, bindingSetString);
-       }
-
-       @Test
-       public void stringToBindingSet() {
-           // Setup the String that will be converted.
-           final String bindingSetString = 
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
-                "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
-                "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";;
-
-           // Convert it to a BindingSet
-           final String[] varOrder = new String[] {"y", "z", "x" };
-        final BindingSet bindingSet = 
FluoStringConverter.toBindingSet(bindingSetString, varOrder);
-
-        // Ensure it converted to the expected result.
-        final MapBindingSet expected = new MapBindingSet();
-        expected.addBinding("x", new URIImpl("http://a";));
-        expected.addBinding("y", new URIImpl("http://b";));
-        expected.addBinding("z", new URIImpl("http://c";));
-
-        assertEquals(expected, bindingSet);
-       }
-
        @Test
        public void statementPatternToString() throws MalformedQueryException {
         // Setup a StatementPattern that represents "?x <http://worksAt> 
<http://Chipotle>."

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
new file mode 100644
index 0000000..025c3e7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Tests the methods of {@link LeftOuterJoin}.
+ */
+public class LeftOuterJoinTest {
+
+    private final ValueFactory vf = new ValueFactoryImpl();
+
+    @Test
+    public void newLeftResult_noRightMatches() {
+        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+        // There is a new left result.
+        MapBindingSet newLeftResult = new MapBindingSet();
+        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+
+        // There are no right results that join with the left result.
+        Iterator<BindingSet> rightResults= new 
ArrayList<BindingSet>().iterator();
+
+        // Therefore, the left result is a new join result.
+        Iterator<BindingSet> newJoinResultsIt = 
leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+
+        Set<BindingSet> newJoinResults = new HashSet<>();
+        while(newJoinResultsIt.hasNext()) {
+            newJoinResults.add( newJoinResultsIt.next() );
+        }
+
+        Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult 
);
+
+        assertEquals(expected, newJoinResults);
+    }
+
+    @Test
+    public void newLeftResult_joinsWithRightResults() {
+        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+        // There is a new left result.
+        MapBindingSet newLeftResult = new MapBindingSet();
+        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+        newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+
+        // There are a few right results that join with the left result.
+        MapBindingSet nameAge = new MapBindingSet();
+        nameAge.addBinding("name", vf.createLiteral("Bob"));
+        nameAge.addBinding("age", vf.createLiteral(56));
+
+        MapBindingSet nameHair = new MapBindingSet();
+        nameHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+        Iterator<BindingSet> rightResults = 
Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+        // Therefore, there are a few new join results that mix the two 
together.
+        Iterator<BindingSet> newJoinResultsIt = 
leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+
+        Set<BindingSet> newJoinResults = new HashSet<>();
+        while(newJoinResultsIt.hasNext()) {
+            newJoinResults.add( newJoinResultsIt.next() );
+        }
+
+        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        MapBindingSet nameHeightAge = new MapBindingSet();
+        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightAge.addBinding("age", vf.createLiteral(56));
+        expected.add(nameHeightAge);
+
+        MapBindingSet nameHeightHair = new MapBindingSet();
+        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+        expected.add(nameHeightHair);
+
+        assertEquals(expected, newJoinResults);
+    }
+
+    @Test
+    public void newRightResult_noLeftMatches() {
+        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+        // There are no left results.
+        Iterator<BindingSet> leftResults= new 
ArrayList<BindingSet>().iterator();
+
+        // There is a new right result.
+        MapBindingSet newRightResult = new MapBindingSet();
+        newRightResult.addBinding("name", vf.createLiteral("Bob"));
+
+        // Therefore, there are no new join results.
+        Iterator<BindingSet> newJoinResultsIt = 
leftOuterJoin.newRightResult(leftResults, newRightResult);
+        assertFalse( newJoinResultsIt.hasNext() );
+    }
+
+    @Test
+    public void newRightResult_joinsWithLeftResults() {
+        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+        // There are a few left results that join with the new right result.
+        MapBindingSet nameAge = new MapBindingSet();
+        nameAge.addBinding("name", vf.createLiteral("Bob"));
+        nameAge.addBinding("age", vf.createLiteral(56));
+
+        MapBindingSet nameHair = new MapBindingSet();
+        nameHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+        Iterator<BindingSet> leftResults = 
Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+        // There is a new right result.
+        MapBindingSet newRightResult = new MapBindingSet();
+        newRightResult.addBinding("name", vf.createLiteral("Bob"));
+        newRightResult.addBinding("height", vf.createLiteral("5'9\""));
+
+        // Therefore, there are a few new join results that mix the two 
together.
+        Iterator<BindingSet> newJoinResultsIt = 
leftOuterJoin.newRightResult(leftResults, newRightResult);
+
+        Set<BindingSet> newJoinResults = new HashSet<>();
+        while(newJoinResultsIt.hasNext()) {
+            newJoinResults.add( newJoinResultsIt.next() );
+        }
+
+        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        MapBindingSet nameHeightAge = new MapBindingSet();
+        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightAge.addBinding("age", vf.createLiteral(56));
+        expected.add(nameHeightAge);
+
+        MapBindingSet nameHeightHair = new MapBindingSet();
+        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+        expected.add(nameHeightHair);
+
+        assertEquals(expected, newJoinResults);
+    }
+}
\ No newline at end of file

Reply via email to