DRILL-190 (part1) First pass at join operator (includes work from JN).

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ceee5db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ceee5db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ceee5db

Branch: refs/heads/master
Commit: 8ceee5dbd23c4039b3738b3de8d51039aae8e910
Parents: dddae74
Author: Ben Becker <benjamin.bec...@gmail.com>
Authored: Thu Aug 15 21:16:27 2013 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Aug 28 20:35:05 2013 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |  19 ++-
 .../exec/physical/base/PhysicalVisitor.java     |  15 +-
 .../exec/physical/config/MergeJoinPOP.java      |  64 ++++++++
 .../exec/physical/impl/join/JoinEvaluator.java  |   9 ++
 .../exec/physical/impl/join/JoinStatus.java     | 154 ++++++++++++++++++
 .../exec/physical/impl/join/JoinTemplate.java   | 155 +++++++++++++++++++
 .../exec/physical/impl/join/JoinWorker.java     |  20 +++
 .../exec/physical/impl/join/MergeJoinBatch.java | 135 ++++++++++++++++
 .../impl/join/MergeJoinBatchBuilder.java        | 128 +++++++++++++++
 9 files changed, 695 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index cf6cb47..ad41452 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -17,7 +17,17 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> 
implements PhysicalVisitor<T, X, E> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -28,7 +38,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends 
Throwable> impleme
   }
 
   @Override
-  public T visitUnion(Union union, X value) throws E {
+  public T visitUnion(UnionExchange union, X value) throws E {
     return visitOp(union, value);
   }
 
@@ -87,6 +97,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
   
   @Override
+  public T visitMergeJoin(MergeJoinPOP join, X value) throws E {
+    return visitOp(join, value);
+  }
+
+  @Override
   public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
     return visitSender(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 9f76693..5f50422 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -17,7 +17,17 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
 
 /**
  * Visitor class designed to traversal of a operator tree.  Basis for a number 
of operator manipulations including fragmentation and materialization.
@@ -35,9 +45,10 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP 
extends Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
+  public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
   public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
   public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) 
throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
new file mode 100644
index 0000000..05fee19
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -0,0 +1,64 @@
+package org.apache.drill.exec.physical.config;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+@JsonTypeName("merge-join")
+public class MergeJoinPOP extends AbstractBase{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergeJoinPOP.class);
+
+  
+  private PhysicalOperator left;
+  private PhysicalOperator right;
+  private List<JoinCondition> conditions;
+  
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0,0,0,0);
+  }
+
+  @JsonCreator
+  public MergeJoinPOP(
+      @JsonProperty("left") PhysicalOperator left, 
+      @JsonProperty("right") PhysicalOperator right,
+      @JsonProperty("join-conditions") List<JoinCondition> conditions 
+      ) {
+    this.left = left;
+    this.right = right;
+    this.conditions = conditions;
+  }
+  
+  @Override
+  public Size getSize() {
+    return left.getSize().add(right.getSize());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitMergeJoin(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.size() == 2);
+    return new MergeJoinPOP(children.get(0), children.get(1), conditions);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.forArray(left, right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
new file mode 100644
index 0000000..42ca604
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface JoinEvaluator {
+  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch 
outgoing);
+  public abstract boolean copy(int leftPosition, int rightPosition, int 
outputPosition);
+  public abstract int compare(int leftPosition, int rightPosition);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
new file mode 100644
index 0000000..8831006
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -0,0 +1,154 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * The status of the current join.  Maintained outside the individually 
compiled join templates so that we can carry status across multiple schemas.
+ */
+public final class JoinStatus {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+
+  public static enum RightSourceMode {
+    INCOMING_BATCHES, QUEUED_BATCHES;
+  }
+
+  public int leftPosition;
+  private final RecordBatch left;
+  private IterOutcome lastLeft;
+
+  public int rightPosition;
+  public int svRightPosition;
+  private final RecordBatch right;
+  private IterOutcome lastRight;
+  
+  public int outputPosition;
+  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+  public MergeJoinBatch outputBatch;
+  public SelectionVector4 sv4;
+
+  private boolean initialSet = false;
+  private boolean leftRepeating = false;
+  
+  public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch 
output) {
+    super();
+    this.left = left;
+    this.right = right;
+    this.outputBatch = output;
+  }
+
+  public final void ensureInitial(){
+    if(!initialSet){
+      this.lastLeft = left.next();
+      this.lastRight = right.next();
+      initialSet = true;
+    }
+  }
+  
+  public final void advanceLeft(){
+    leftPosition++;
+  }
+
+  public final void advanceRight(){
+    if (rightSourceMode == RightSourceMode.INCOMING_BATCHES)
+      rightPosition++;
+    else {
+      // advance through queued batches
+    }
+  }
+
+  public final int getLeftPosition() {
+    return leftPosition;
+  }
+
+  public final int getRightPosition() {
+    return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? 
rightPosition : svRightPosition;
+  }
+
+  public final void notifyLeftRepeating() {
+    leftRepeating = true;
+    outputBatch.resetBatchBuilder();
+  }
+
+  public final void notifyLeftStoppedRepeating() {
+    leftRepeating = false;
+  }
+
+  public final boolean isLeftRepeating() {
+    return leftRepeating;
+  }
+
+  public void setDefaultAdvanceMode() {
+    rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+    rightPosition = 0;
+  }
+
+  public void setRepeatedAdvanceMode() {
+    rightSourceMode = RightSourceMode.QUEUED_BATCHES;
+    svRightPosition = 0;
+  }
+
+  /**
+   * Check if the left record position can advance by one.
+   * Side effect: advances to next left batch if current left batch size is 
exceeded.
+   */
+  public final boolean isLeftPositionAllowed(){
+    if(!isNextLeftPositionInCurrentBatch()){
+      leftPosition = 0;
+      lastLeft = left.next();
+      return lastLeft == IterOutcome.OK;
+    }else{
+      lastLeft = IterOutcome.OK;
+      return true;
+    }
+  }
+
+  /**
+   * Check if the right record position can advance by one.
+   * Side effect: advances to next right batch if current right batch size is 
exceeded
+   */
+  public final boolean isRightPositionAllowed(){
+    if(isNextRightPositionInCurrentBatch()){
+      rightPosition = 0;
+      lastRight = right.next();
+      return lastRight == IterOutcome.OK;
+    }else{
+      lastRight = IterOutcome.OK;
+      return true;
+    }
+    
+  }
+
+  /**
+   * Check if the left record position can advance by one in the current batch.
+   */
+  public final boolean isNextLeftPositionInCurrentBatch() {
+    return leftPosition < left.getRecordCount();
+  }
+
+  /**
+   * Check if the left record position can advance by one in the current batch.
+   */
+  public final boolean isNextRightPositionInCurrentBatch() {
+    return rightPosition < right.getRecordCount();
+  }
+
+  public JoinOutcome getOutcome(){
+    if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
+      return JoinOutcome.BATCH_RETURNED;
+    if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
+      return JoinOutcome.SCHEMA_CHANGED;
+    if (eitherMatches(IterOutcome.NOT_YET))
+      return JoinOutcome.WAITING;
+    if (eitherMatches(IterOutcome.NONE))
+      return JoinOutcome.NO_MORE_DATA;
+    return JoinOutcome.FAILURE;
+  }
+  
+  private boolean eitherMatches(IterOutcome outcome){
+    return lastLeft == outcome || lastRight == outcome;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
new file mode 100644
index 0000000..51cc5e5
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -0,0 +1,155 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * This join template uses a merge join to combine two ordered streams into a 
single larger batch.  When joining
+ * single values on each side, the values can be copied to the outgoing batch 
immediately.  The outgoing record batch
+ * should be sent as needed (e.g. schema change or outgoing batch full).  When 
joining multiple values on one or
+ * both sides, two passes over the vectors will be made; one to construct the 
selection vector, and another to
+ * generate the outgoing batches once the duplicate value is no longer 
encountered.
+ *
+ * Given two tables ordered by 'col1':
+ *
+ *        t1                t2
+ *  ---------------   ---------------
+ *  | key | col2 |    | key | col2 |
+ *  ---------------   ---------------
+ *  |  1  | 'ab' |    |  1  | 'AB' |
+ *  |  2  | 'cd' |    |  2  | 'CD' |
+ *  |  2  | 'ef' |    |  4  | 'EF' |
+ *  |  4  | 'gh' |    |  4  | 'GH' |
+ *  |  4  | 'ij' |    |  5  | 'IJ' |
+ *  ---------------   ---------------
+ *
+ * 'SELECT * FROM t1 INNER JOIN t2 on (t1.key == t2.key)' should generate the 
following:
+ *
+ * ---------------------------------
+ * | t1.key | t2.key | col1 | col2 |
+ * ---------------------------------
+ * |   1    |   1    | 'ab' | 'AB' |
+ * |   2    |   2    | 'cd' | 'CD' |
+ * |   2    |   2    | 'ef' | 'CD' |
+ * |   4    |   4    | 'gh' | 'EF' |
+ * |   4    |   4    | 'gh' | 'GH' |
+ * |   4    |   4    | 'ij' | 'EF' |
+ * |   4    |   4    | 'ij' | 'GH' |
+ * ---------------------------------
+ *
+ * In the simple match case, only one row from each table matches.  Additional 
cases should be considered:
+ *   - a left join key matches multiple right join keys
+ *   - duplicate keys which may span multiple record batches (on the left 
and/or right side)
+ *   - one or both incoming record batches change schemas
+ *
+ * In the case where a left join key matches multiple right join keys:
+ *   - add a reference to all of the right table's matching values to the SV4.
+ *
+ * A RecordBatchData object should be used to hold onto all batches which have 
not been sent.
+ *
+ * JoinStatus:
+ *   - all state related to the join operation is stored in the JoinStatus 
object.
+ *   - this is required since code may be regenerated before completion of an 
outgoing record batch.
+ */
+public abstract class JoinTemplate implements JoinWorker {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinTemplate.class);
+
+  @Override
+  public void setupJoin(JoinStatus status, VectorContainer outgoing){
+    
+  }
+
+  /**
+   * Copy rows from the input record batches until the output record batch is 
full
+   * @param status  State of the join operation (persists across multiple 
record batches/schema changes)
+   */
+  public final void doJoin(final JoinStatus status) {
+    while (true) {
+      // for each record
+
+      // validate position and advance to the next record batch if necessary
+      if (!status.isLeftPositionAllowed()) return;
+      if (!status.isRightPositionAllowed()) return;
+
+      int comparison = compare(status.leftPosition, status.rightPosition);
+      switch (comparison) {
+
+      case -1:
+        // left key < right key
+        status.advanceLeft();
+        continue;
+
+      case 0:
+        // left key == right key
+        if (!status.isLeftRepeating() &&
+            status.isNextLeftPositionInCurrentBatch() &&
+            compareNextLeftKey(status.leftPosition) == 0) {
+          // records in the left batch contain duplicate keys
+          // TODO: leftHasDups = true, if next left key matches but is in a 
new batch
+          status.notifyLeftRepeating();
+        }
+        
+        do {
+          // copy all equal right keys to the output record batch
+          if (!copy(status.leftPosition, status.rightPosition, 
status.outputPosition++))
+            return;
+
+          // If the left key has duplicates and we're about to cross batch 
boundaries, queue the
+          // right table's record batch before calling next.  These records 
will need to be copied
+          // again for each duplicate left key.
+          if (status.isLeftRepeating() && 
!status.isNextRightPositionInCurrentBatch()) {
+            // last record in right batch is a duplicate, and at the end of 
the batch
+            status.outputBatch.addRightToBatchBuilder();
+          }
+          status.advanceRight();
+        } while (status.isRightPositionAllowed() && 
compare(status.leftPosition, status.rightPosition) == 0);
+
+        status.advanceLeft();
+
+        if (status.isLeftRepeating() && 
compareNextLeftKey(status.leftPosition) != 0) {
+          // left no longer has duplicates.  switch back to incoming batch mode
+          status.setDefaultAdvanceMode();
+          status.notifyLeftStoppedRepeating();
+        } else if (status.isLeftRepeating()) {
+          // left is going to repeat; use sv4 for right batch
+          status.setRepeatedAdvanceMode();
+        }          
+
+        continue;
+
+      case 1:
+        // left key > right key
+        status.advanceRight();
+        continue;
+
+      default:
+        throw new IllegalStateException();
+      }
+    }
+  }
+
+  
+  /**
+   * Copy the data to the new record batch (if it fits).
+   *
+   * @param leftPosition  position of batch (lower 16 bits) and record (upper 
16 bits) in left SV4
+   * @param rightPosition position of batch (lower 16 bits) and record (upper 
16 bits) in right SV4
+   * @param outputPosition position of the output record batch
+   * @return Whether or not the data was copied.
+   */
+  protected abstract boolean copy(int leftPosition, int rightPosition, int 
outputPosition);
+  
+  /**
+   * Compare the values of the left and right join key to determine whether 
the left is less than, greater than
+   * or equal to the right.
+   *
+   * @param leftPosition
+   * @param rightPosition
+   * @return  0 if both keys are equal
+   *         -1 if left is < right
+   *          1 if left is > right
+   */
+  protected abstract int compare(int leftPosition, int rightPosition);
+  protected abstract int compareNextLeftKey(int position);
+  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch 
outgoing);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
new file mode 100644
index 0000000..54d2076
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -0,0 +1,20 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.record.VectorContainer;
+
+
+public interface JoinWorker {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinWorker.class);
+  
+  public static enum JoinOutcome {
+    NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, 
FAILURE;
+  }
+  
+  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<JoinWorker>( //
+      JoinWorker.class, 
"org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", 
JoinEvaluator.class, null);
+
+  
+  public void setupJoin(JoinStatus status, VectorContainer outgoing);
+  public void doJoin(JoinStatus status);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
new file mode 100644
index 0000000..4d633bb
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -0,0 +1,135 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * A merge join combining to incoming in-order batches.
+ */
+public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+  
+  private final RecordBatch left;
+  private final RecordBatch right;
+  private final JoinStatus status;
+  private JoinWorker worker;
+  public MergeJoinBatchBuilder batchBuilder;
+  
+  protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, 
RecordBatch left, RecordBatch right) {
+    super(popConfig, context);
+    this.left = left;
+    this.right = right;
+    this.status = new JoinStatus(left, right, this);
+    this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+  }
+
+  @Override
+  public int getRecordCount() {
+    return status.outputPosition;
+  }
+
+  @Override
+  public IterOutcome next() {
+    
+    // we do this in the here instead of the constructor because don't 
necessary want to start consuming on construction.
+    status.ensureInitial();
+    
+    // loop so we can start over again if we find a new batch was created.
+    while(true){
+      
+      boolean first = false;
+      if(worker == null){
+        try {
+          this.worker = getNewWorker();
+          first = true;
+        } catch (ClassTransformationException | IOException e) {
+          context.fail(new SchemaChangeException(e));
+          kill();
+          return IterOutcome.STOP;
+        }
+      }
+
+      // if the previous outcome was a change in schema or we sent a batch, we 
have to set up a new batch.
+      if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || 
status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){
+        allocateBatch();
+      }
+
+      // join until we have a complete outgoing batch
+      worker.doJoin(status);
+
+      // get the outcome of the join.
+      switch(status.getOutcome()){
+      case BATCH_RETURNED:
+        // only return new schema if new worker has been setup.
+        return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+      case FAILURE:
+        kill();
+        return IterOutcome.STOP;
+      case NO_MORE_DATA:
+        return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE;
+      case MODE_CHANGED:
+      case SCHEMA_CHANGED:
+        worker = null;
+        if(status.outputPosition > 0){
+          // if we have current data, let's return that.
+          return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+        }else{
+          // loop again to rebuild worker.
+          continue;
+        }
+      case WAITING:
+        return IterOutcome.NOT_YET;
+      default:
+        throw new IllegalStateException();
+      }
+    }
+  }
+
+  public void resetBatchBuilder() {
+    batchBuilder = new MergeJoinBatchBuilder(context, status);
+  }
+
+  public void addRightToBatchBuilder() {
+    batchBuilder.add(right);
+  }
+
+  @Override
+  protected void killIncoming() {
+    left.kill();
+    right.kill();
+  }
+
+  private JoinWorker getNewWorker() throws ClassTransformationException, 
IOException{
+    CodeGenerator<JoinWorker> cg = new 
CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
+
+    // if (status.rightSourceMode)
+      // generate copier which deref's SV4
+    // else
+      // generate direct copier.
+    
+    // generate comparator.
+    // generate compareNextLeftKey.
+
+    JoinWorker w = context.getImplementationClass(cg);
+    w.setupJoin(status, this.container);
+    return w;
+  }
+
+  private void allocateBatch(){
+    // allocate new batch space.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
new file mode 100644
index 0000000..d75cfb9
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -0,0 +1,128 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.List;
+
+public class MergeJoinBatchBuilder {
+
+  private final ArrayListMultimap<BatchSchema, RecordBatchData> 
queuedRightBatches = ArrayListMultimap.create();
+  private VectorContainer container;
+  private int runningBytes;
+  private int runningBatches;
+  private int recordCount;
+  private PreAllocator svAllocator;
+  private JoinStatus status;
+
+  public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+    this.status = status;
+    this.svAllocator = context.getAllocator().getPreAllocator();
+  }
+
+  public boolean add(RecordBatch batch) {
+    if (batch.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.FOUR_BYTE)
+      throw new UnsupportedOperationException("A sort cannot currently work 
against a sv4 batch.");
+    if (batch.getRecordCount() == 0) return true; // skip over empty record 
batches.
+
+    // resource checks
+    long batchBytes = getSize(batch);
+    if (batchBytes + runningBytes > Integer.MAX_VALUE) return false;      // 
TODO: 2GB is arbitrary
+    if (runningBatches + 1 > Character.MAX_VALUE) return false;           // 
allowed in batch.
+    if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // 
sv allocation available.
+
+    // transfer VVs to a new RecordBatchData
+    RecordBatchData bd = new RecordBatchData(batch);
+    runningBytes += batchBytes;
+    queuedRightBatches.put(batch.getSchema(), bd);
+    recordCount += bd.getRecordCount();
+    return true;
+  }
+
+  private long getSize(RecordBatch batch){
+    long bytes = 0;
+    for(VectorWrapper<?> v : batch){
+      bytes += v.getValueVector().getBufferSize();
+    }
+    return bytes;
+  }
+
+  public void build() throws SchemaChangeException {
+    container.clear();
+//    if (queuedRightBatches.keySet().size() > 1) throw new 
SchemaChangeException("Join currently only supports a single schema.");
+    if (queuedRightBatches.size() > Character.MAX_VALUE) throw new 
SchemaChangeException("Join cannot work on more than %d batches at a time.", 
(int) Character.MAX_VALUE);
+    status.sv4 = new SelectionVector4(svAllocator.getAllocation(), 
recordCount, Character.MAX_VALUE);
+    BatchSchema schema = queuedRightBatches.keySet().iterator().next();
+    List<RecordBatchData> data = queuedRightBatches.get(schema);
+
+    // now we're going to generate the sv4 pointers
+    switch(schema.getSelectionVectorMode()){
+      case NONE: {
+        int index = 0;
+        int recordBatchId = 0;
+        for(RecordBatchData d : data){
+          for(int i =0; i < d.getRecordCount(); i++, index++){
+            status.sv4.set(index, recordBatchId, i);
+          }
+          recordBatchId++;
+        }
+        break;
+      }
+      case TWO_BYTE: {
+        int index = 0;
+        int recordBatchId = 0;
+        for(RecordBatchData d : data){
+          for(int i =0; i < d.getRecordCount(); i++, index++){
+            status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+          }
+          // might as well drop the selection vector since we'll stop using it 
now.
+          d.getSv2().clear();
+          recordBatchId++;
+        }
+        break;
+      }
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    // next, we'll create lists of each of the vector types.
+    ArrayListMultimap<MaterializedField, ValueVector> vectors = 
ArrayListMultimap.create();
+    for (RecordBatchData rbd : queuedRightBatches.values()) {
+      for (ValueVector v : rbd.getVectors()) {
+        vectors.put(v.getField(), v);
+      }
+    }
+
+    for(MaterializedField f : vectors.keySet()){
+      List<ValueVector> v = vectors.get(f);
+      container.addHyperList(v);
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+  }
+
+}

Reply via email to