DRILL-190 (part1) First pass at join operator (includes work from JN).
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ceee5db Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ceee5db Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ceee5db Branch: refs/heads/master Commit: 8ceee5dbd23c4039b3738b3de8d51039aae8e910 Parents: dddae74 Author: Ben Becker <benjamin.bec...@gmail.com> Authored: Thu Aug 15 21:16:27 2013 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Aug 28 20:35:05 2013 -0700 ---------------------------------------------------------------------- .../physical/base/AbstractPhysicalVisitor.java | 19 ++- .../exec/physical/base/PhysicalVisitor.java | 15 +- .../exec/physical/config/MergeJoinPOP.java | 64 ++++++++ .../exec/physical/impl/join/JoinEvaluator.java | 9 ++ .../exec/physical/impl/join/JoinStatus.java | 154 ++++++++++++++++++ .../exec/physical/impl/join/JoinTemplate.java | 155 +++++++++++++++++++ .../exec/physical/impl/join/JoinWorker.java | 20 +++ .../exec/physical/impl/join/MergeJoinBatch.java | 135 ++++++++++++++++ .../impl/join/MergeJoinBatchBuilder.java | 128 +++++++++++++++ 9 files changed, 695 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index cf6cb47..ad41452 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -17,7 +17,17 @@ ******************************************************************************/ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.*; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.HashToRandomExchange; +import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.RandomReceiver; +import org.apache.drill.exec.physical.config.RangeSender; +import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.physical.config.SingleSender; +import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.UnionExchange; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -28,7 +38,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override - public T visitUnion(Union union, X value) throws E { + public T visitUnion(UnionExchange union, X value) throws E { return visitOp(union, value); } @@ -87,6 +97,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitMergeJoin(MergeJoinPOP join, X value) throws E { + return visitOp(join, value); + } + + @Override public T visitHashPartitionSender(HashPartitionSender op, X value) throws E { return visitSender(op, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 9f76693..5f50422 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -17,7 +17,17 @@ ******************************************************************************/ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.*; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.HashToRandomExchange; +import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.RandomReceiver; +import org.apache.drill.exec.physical.config.RangeSender; +import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.physical.config.SingleSender; +import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.UnionExchange; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -35,9 +45,10 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitStore(Store store, EXTRA value) throws EXCEP; public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP; - public RETURN visitUnion(Union union, EXTRA value) throws EXCEP; + public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP; public RETURN visitProject(Project project, EXTRA value) throws EXCEP; public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP; + public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP; public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP; public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java new file mode 100644 index 0000000..05fee19 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java @@ -0,0 +1,64 @@ +package org.apache.drill.exec.physical.config; + +import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.Size; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +@JsonTypeName("merge-join") +public class MergeJoinPOP extends AbstractBase{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinPOP.class); + + + private PhysicalOperator left; + private PhysicalOperator right; + private List<JoinCondition> conditions; + + @Override + public OperatorCost getCost() { + return new OperatorCost(0,0,0,0); + } + + @JsonCreator + public MergeJoinPOP( + @JsonProperty("left") PhysicalOperator left, + @JsonProperty("right") PhysicalOperator right, + @JsonProperty("join-conditions") List<JoinCondition> conditions + ) { + this.left = left; + this.right = right; + this.conditions = conditions; + } + + @Override + public Size getSize() { + return left.getSize().add(right.getSize()); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitMergeJoin(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.size() == 2); + return new MergeJoinPOP(children.get(0), children.get(1), conditions); + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.forArray(left, right); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java new file mode 100644 index 0000000..42ca604 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java @@ -0,0 +1,9 @@ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.record.RecordBatch; + +public interface JoinEvaluator { + public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing); + public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition); + public abstract int compare(int leftPosition, int rightPosition); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java new file mode 100644 index 0000000..8831006 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -0,0 +1,154 @@ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.selection.SelectionVector4; + +/** + * The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas. + */ +public final class JoinStatus { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class); + + public static enum RightSourceMode { + INCOMING_BATCHES, QUEUED_BATCHES; + } + + public int leftPosition; + private final RecordBatch left; + private IterOutcome lastLeft; + + public int rightPosition; + public int svRightPosition; + private final RecordBatch right; + private IterOutcome lastRight; + + public int outputPosition; + public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES; + public MergeJoinBatch outputBatch; + public SelectionVector4 sv4; + + private boolean initialSet = false; + private boolean leftRepeating = false; + + public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) { + super(); + this.left = left; + this.right = right; + this.outputBatch = output; + } + + public final void ensureInitial(){ + if(!initialSet){ + this.lastLeft = left.next(); + this.lastRight = right.next(); + initialSet = true; + } + } + + public final void advanceLeft(){ + leftPosition++; + } + + public final void advanceRight(){ + if (rightSourceMode == RightSourceMode.INCOMING_BATCHES) + rightPosition++; + else { + // advance through queued batches + } + } + + public final int getLeftPosition() { + return leftPosition; + } + + public final int getRightPosition() { + return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? rightPosition : svRightPosition; + } + + public final void notifyLeftRepeating() { + leftRepeating = true; + outputBatch.resetBatchBuilder(); + } + + public final void notifyLeftStoppedRepeating() { + leftRepeating = false; + } + + public final boolean isLeftRepeating() { + return leftRepeating; + } + + public void setDefaultAdvanceMode() { + rightSourceMode = RightSourceMode.INCOMING_BATCHES; + rightPosition = 0; + } + + public void setRepeatedAdvanceMode() { + rightSourceMode = RightSourceMode.QUEUED_BATCHES; + svRightPosition = 0; + } + + /** + * Check if the left record position can advance by one. + * Side effect: advances to next left batch if current left batch size is exceeded. + */ + public final boolean isLeftPositionAllowed(){ + if(!isNextLeftPositionInCurrentBatch()){ + leftPosition = 0; + lastLeft = left.next(); + return lastLeft == IterOutcome.OK; + }else{ + lastLeft = IterOutcome.OK; + return true; + } + } + + /** + * Check if the right record position can advance by one. + * Side effect: advances to next right batch if current right batch size is exceeded + */ + public final boolean isRightPositionAllowed(){ + if(isNextRightPositionInCurrentBatch()){ + rightPosition = 0; + lastRight = right.next(); + return lastRight == IterOutcome.OK; + }else{ + lastRight = IterOutcome.OK; + return true; + } + + } + + /** + * Check if the left record position can advance by one in the current batch. + */ + public final boolean isNextLeftPositionInCurrentBatch() { + return leftPosition < left.getRecordCount(); + } + + /** + * Check if the left record position can advance by one in the current batch. + */ + public final boolean isNextRightPositionInCurrentBatch() { + return rightPosition < right.getRecordCount(); + } + + public JoinOutcome getOutcome(){ + if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK) + return JoinOutcome.BATCH_RETURNED; + if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) + return JoinOutcome.SCHEMA_CHANGED; + if (eitherMatches(IterOutcome.NOT_YET)) + return JoinOutcome.WAITING; + if (eitherMatches(IterOutcome.NONE)) + return JoinOutcome.NO_MORE_DATA; + return JoinOutcome.FAILURE; + } + + private boolean eitherMatches(IterOutcome outcome){ + return lastLeft == outcome || lastRight == outcome; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java new file mode 100644 index 0000000..51cc5e5 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -0,0 +1,155 @@ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; + +/** + * This join template uses a merge join to combine two ordered streams into a single larger batch. When joining + * single values on each side, the values can be copied to the outgoing batch immediately. The outgoing record batch + * should be sent as needed (e.g. schema change or outgoing batch full). When joining multiple values on one or + * both sides, two passes over the vectors will be made; one to construct the selection vector, and another to + * generate the outgoing batches once the duplicate value is no longer encountered. + * + * Given two tables ordered by 'col1': + * + * t1 t2 + * --------------- --------------- + * | key | col2 | | key | col2 | + * --------------- --------------- + * | 1 | 'ab' | | 1 | 'AB' | + * | 2 | 'cd' | | 2 | 'CD' | + * | 2 | 'ef' | | 4 | 'EF' | + * | 4 | 'gh' | | 4 | 'GH' | + * | 4 | 'ij' | | 5 | 'IJ' | + * --------------- --------------- + * + * 'SELECT * FROM t1 INNER JOIN t2 on (t1.key == t2.key)' should generate the following: + * + * --------------------------------- + * | t1.key | t2.key | col1 | col2 | + * --------------------------------- + * | 1 | 1 | 'ab' | 'AB' | + * | 2 | 2 | 'cd' | 'CD' | + * | 2 | 2 | 'ef' | 'CD' | + * | 4 | 4 | 'gh' | 'EF' | + * | 4 | 4 | 'gh' | 'GH' | + * | 4 | 4 | 'ij' | 'EF' | + * | 4 | 4 | 'ij' | 'GH' | + * --------------------------------- + * + * In the simple match case, only one row from each table matches. Additional cases should be considered: + * - a left join key matches multiple right join keys + * - duplicate keys which may span multiple record batches (on the left and/or right side) + * - one or both incoming record batches change schemas + * + * In the case where a left join key matches multiple right join keys: + * - add a reference to all of the right table's matching values to the SV4. + * + * A RecordBatchData object should be used to hold onto all batches which have not been sent. + * + * JoinStatus: + * - all state related to the join operation is stored in the JoinStatus object. + * - this is required since code may be regenerated before completion of an outgoing record batch. + */ +public abstract class JoinTemplate implements JoinWorker { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinTemplate.class); + + @Override + public void setupJoin(JoinStatus status, VectorContainer outgoing){ + + } + + /** + * Copy rows from the input record batches until the output record batch is full + * @param status State of the join operation (persists across multiple record batches/schema changes) + */ + public final void doJoin(final JoinStatus status) { + while (true) { + // for each record + + // validate position and advance to the next record batch if necessary + if (!status.isLeftPositionAllowed()) return; + if (!status.isRightPositionAllowed()) return; + + int comparison = compare(status.leftPosition, status.rightPosition); + switch (comparison) { + + case -1: + // left key < right key + status.advanceLeft(); + continue; + + case 0: + // left key == right key + if (!status.isLeftRepeating() && + status.isNextLeftPositionInCurrentBatch() && + compareNextLeftKey(status.leftPosition) == 0) { + // records in the left batch contain duplicate keys + // TODO: leftHasDups = true, if next left key matches but is in a new batch + status.notifyLeftRepeating(); + } + + do { + // copy all equal right keys to the output record batch + if (!copy(status.leftPosition, status.rightPosition, status.outputPosition++)) + return; + + // If the left key has duplicates and we're about to cross batch boundaries, queue the + // right table's record batch before calling next. These records will need to be copied + // again for each duplicate left key. + if (status.isLeftRepeating() && !status.isNextRightPositionInCurrentBatch()) { + // last record in right batch is a duplicate, and at the end of the batch + status.outputBatch.addRightToBatchBuilder(); + } + status.advanceRight(); + } while (status.isRightPositionAllowed() && compare(status.leftPosition, status.rightPosition) == 0); + + status.advanceLeft(); + + if (status.isLeftRepeating() && compareNextLeftKey(status.leftPosition) != 0) { + // left no longer has duplicates. switch back to incoming batch mode + status.setDefaultAdvanceMode(); + status.notifyLeftStoppedRepeating(); + } else if (status.isLeftRepeating()) { + // left is going to repeat; use sv4 for right batch + status.setRepeatedAdvanceMode(); + } + + continue; + + case 1: + // left key > right key + status.advanceRight(); + continue; + + default: + throw new IllegalStateException(); + } + } + } + + + /** + * Copy the data to the new record batch (if it fits). + * + * @param leftPosition position of batch (lower 16 bits) and record (upper 16 bits) in left SV4 + * @param rightPosition position of batch (lower 16 bits) and record (upper 16 bits) in right SV4 + * @param outputPosition position of the output record batch + * @return Whether or not the data was copied. + */ + protected abstract boolean copy(int leftPosition, int rightPosition, int outputPosition); + + /** + * Compare the values of the left and right join key to determine whether the left is less than, greater than + * or equal to the right. + * + * @param leftPosition + * @param rightPosition + * @return 0 if both keys are equal + * -1 if left is < right + * 1 if left is > right + */ + protected abstract int compare(int leftPosition, int rightPosition); + protected abstract int compareNextLeftKey(int position); + public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java new file mode 100644 index 0000000..54d2076 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -0,0 +1,20 @@ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.record.VectorContainer; + + +public interface JoinWorker { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinWorker.class); + + public static enum JoinOutcome { + NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, FAILURE; + } + + public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<JoinWorker>( // + JoinWorker.class, "org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", JoinEvaluator.class, null); + + + public void setupJoin(JoinStatus status, VectorContainer outgoing); + public void doJoin(JoinStatus status); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java new file mode 100644 index 0000000..4d633bb --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -0,0 +1,135 @@ +package org.apache.drill.exec.physical.impl.join; + +import java.io.IOException; +import java.util.List; + +import com.google.common.collect.ArrayListMultimap; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +/** + * A merge join combining to incoming in-order batches. + */ +public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class); + + private final RecordBatch left; + private final RecordBatch right; + private final JoinStatus status; + private JoinWorker worker; + public MergeJoinBatchBuilder batchBuilder; + + protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) { + super(popConfig, context); + this.left = left; + this.right = right; + this.status = new JoinStatus(left, right, this); + this.batchBuilder = new MergeJoinBatchBuilder(context, status); + } + + @Override + public int getRecordCount() { + return status.outputPosition; + } + + @Override + public IterOutcome next() { + + // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. + status.ensureInitial(); + + // loop so we can start over again if we find a new batch was created. + while(true){ + + boolean first = false; + if(worker == null){ + try { + this.worker = getNewWorker(); + first = true; + } catch (ClassTransformationException | IOException e) { + context.fail(new SchemaChangeException(e)); + kill(); + return IterOutcome.STOP; + } + } + + // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. + if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){ + allocateBatch(); + } + + // join until we have a complete outgoing batch + worker.doJoin(status); + + // get the outcome of the join. + switch(status.getOutcome()){ + case BATCH_RETURNED: + // only return new schema if new worker has been setup. + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + case FAILURE: + kill(); + return IterOutcome.STOP; + case NO_MORE_DATA: + return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE; + case MODE_CHANGED: + case SCHEMA_CHANGED: + worker = null; + if(status.outputPosition > 0){ + // if we have current data, let's return that. + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + }else{ + // loop again to rebuild worker. + continue; + } + case WAITING: + return IterOutcome.NOT_YET; + default: + throw new IllegalStateException(); + } + } + } + + public void resetBatchBuilder() { + batchBuilder = new MergeJoinBatchBuilder(context, status); + } + + public void addRightToBatchBuilder() { + batchBuilder.add(right); + } + + @Override + protected void killIncoming() { + left.kill(); + right.kill(); + } + + private JoinWorker getNewWorker() throws ClassTransformationException, IOException{ + CodeGenerator<JoinWorker> cg = new CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + + // if (status.rightSourceMode) + // generate copier which deref's SV4 + // else + // generate direct copier. + + // generate comparator. + // generate compareNextLeftKey. + + JoinWorker w = context.getImplementationClass(cg); + w.setupJoin(status, this.container); + return w; + } + + private void allocateBatch(){ + // allocate new batch space. + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java new file mode 100644 index 0000000..d75cfb9 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java @@ -0,0 +1,128 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.collect.ArrayListMultimap; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.List; + +public class MergeJoinBatchBuilder { + + private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create(); + private VectorContainer container; + private int runningBytes; + private int runningBatches; + private int recordCount; + private PreAllocator svAllocator; + private JoinStatus status; + + public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) { + this.status = status; + this.svAllocator = context.getAllocator().getPreAllocator(); + } + + public boolean add(RecordBatch batch) { + if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) + throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); + if (batch.getRecordCount() == 0) return true; // skip over empty record batches. + + // resource checks + long batchBytes = getSize(batch); + if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary + if (runningBatches + 1 > Character.MAX_VALUE) return false; // allowed in batch. + if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. + + // transfer VVs to a new RecordBatchData + RecordBatchData bd = new RecordBatchData(batch); + runningBytes += batchBytes; + queuedRightBatches.put(batch.getSchema(), bd); + recordCount += bd.getRecordCount(); + return true; + } + + private long getSize(RecordBatch batch){ + long bytes = 0; + for(VectorWrapper<?> v : batch){ + bytes += v.getValueVector().getBufferSize(); + } + return bytes; + } + + public void build() throws SchemaChangeException { + container.clear(); +// if (queuedRightBatches.keySet().size() > 1) throw new SchemaChangeException("Join currently only supports a single schema."); + if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); + BatchSchema schema = queuedRightBatches.keySet().iterator().next(); + List<RecordBatchData> data = queuedRightBatches.get(schema); + + // now we're going to generate the sv4 pointers + switch(schema.getSelectionVectorMode()){ + case NONE: { + int index = 0; + int recordBatchId = 0; + for(RecordBatchData d : data){ + for(int i =0; i < d.getRecordCount(); i++, index++){ + status.sv4.set(index, recordBatchId, i); + } + recordBatchId++; + } + break; + } + case TWO_BYTE: { + int index = 0; + int recordBatchId = 0; + for(RecordBatchData d : data){ + for(int i =0; i < d.getRecordCount(); i++, index++){ + status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); + } + // might as well drop the selection vector since we'll stop using it now. + d.getSv2().clear(); + recordBatchId++; + } + break; + } + default: + throw new UnsupportedOperationException(); + } + + // next, we'll create lists of each of the vector types. + ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create(); + for (RecordBatchData rbd : queuedRightBatches.values()) { + for (ValueVector v : rbd.getVectors()) { + vectors.put(v.getField(), v); + } + } + + for(MaterializedField f : vectors.keySet()){ + List<ValueVector> v = vectors.get(f); + container.addHyperList(v); + } + + container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); + } + +}