DRILL-301: Join two tables hit IndexOutOfBoundsException
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/316ce8a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/316ce8a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/316ce8a6 Branch: refs/heads/master Commit: 316ce8a6f8f94c31574e7107be26addcfc92dc7f Parents: b12c0b1 Author: Ben Becker <[email protected]> Authored: Sun Dec 1 20:07:45 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Sun Dec 1 20:07:45 2013 -0800 ---------------------------------------------------------------------- .../exec/physical/impl/join/JoinTemplate.java | 28 +++++++++++--------- .../exec/physical/impl/join/JoinWorker.java | 2 +- .../exec/physical/impl/join/MergeJoinBatch.java | 20 ++++++++------ 3 files changed, 29 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index b7fdbf3..aae1a3c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -83,8 +83,9 @@ public abstract class JoinTemplate implements JoinWorker { /** * Copy rows from the input record batches until the output record batch is full * @param status State of the join operation (persists across multiple record batches/schema changes) + * @return true of join succeeded; false if the worker needs to be regenerated */ - public final void doJoin(final JoinStatus status) { + public final boolean doJoin(final JoinStatus status) { while (true) { // for each record @@ -93,14 +94,15 @@ public abstract class JoinTemplate implements JoinWorker { if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) { // we've hit the end of the right record batch; copy any remaining values from the left batch while (status.isLeftPositionAllowed()) { - doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()); + if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) + return false; status.advanceLeft(); } } - return; + return true; } if (!status.isLeftPositionAllowed()) - return; + return true; int comparison = doCompare(status.getLeftPosition(), status.getRightPosition()); switch (comparison) { @@ -108,7 +110,9 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) - doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()); + if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) { + return false; + } status.advanceLeft(); continue; @@ -133,10 +137,10 @@ public abstract class JoinTemplate implements JoinWorker { do { // copy all equal right keys to the output record batch if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) - return; + return false; if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos())) - return; + return false; // If the left key has duplicates and we're about to cross a boundary in the right batch, add the // right table's record batch to the sv4 builder before calling next. These records will need to be @@ -167,7 +171,7 @@ public abstract class JoinTemplate implements JoinWorker { status.ok = false; } // return to indicate recompile in right-sv4 mode - return; + return true; } continue; @@ -193,8 +197,8 @@ public abstract class JoinTemplate implements JoinWorker { /** * Copy the data to the new record batch (if it fits). * - * @param leftPosition position of batch (lower 16 bits) and record (upper 16 bits) in left SV4 - * @param outputPosition position of the output record batch + * @param leftIndex position of batch (lower 16 bits) and record (upper 16 bits) in left SV4 + * @param outIndex position of the output record batch * @return Whether or not the data was copied. */ public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex); @@ -205,8 +209,8 @@ public abstract class JoinTemplate implements JoinWorker { * Compare the values of the left and right join key to determine whether the left is less than, greater than * or equal to the right. * - * @param leftPosition - * @param rightPosition + * @param leftIndex + * @param rightIndex * @return 0 if both keys are equal * -1 if left is < right * 1 if left is > right http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index 4374cef..8643d66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -30,7 +30,7 @@ public interface JoinWorker { } public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; - public void doJoin(JoinStatus status); + public boolean doJoin(JoinStatus status); public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 1e20e91..f3a32cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -166,7 +166,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } // join until we have a complete outgoing batch - worker.doJoin(status); + if (!worker.doJoin(status)) + worker = null; // get the outcome of the join. switch(status.getOutcome()){ @@ -353,10 +354,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vw.getField().getType(),vectorId)); // todo: check result of copyFromSafe and grow allocation - cg.getEvalBlock().add(vvOut.invoke("copyFromSafe") + cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") .arg(COPY_LEFT_MAPPING.getValueReadIndex()) .arg(COPY_LEFT_MAPPING.getValueWriteIndex()) - .arg(vvIn)); + .arg(vvIn).eq(JExpr.FALSE)) + ._then() + ._return(JExpr.FALSE); ++vectorId; } cg.getEvalBlock()._return(JExpr.lit(true)); @@ -372,10 +375,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vw.getField().getType(),vectorId)); // todo: check result of copyFromSafe and grow allocation - cg.getEvalBlock().add(vvOut.invoke("copyFromSafe") - .arg(COPY_RIGHT_MAPPING.getValueReadIndex()) - .arg(COPY_RIGHT_MAPPING.getValueWriteIndex()) - .arg(vvIn)); + cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") + .arg(COPY_RIGHT_MAPPING.getValueReadIndex()) + .arg(COPY_RIGHT_MAPPING.getValueWriteIndex()) + .arg(vvIn).eq(JExpr.FALSE)) + ._then() + ._return(JExpr.FALSE); ++vectorId; } cg.getEvalBlock()._return(JExpr.lit(true)); @@ -388,7 +393,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private void allocateBatch() { // allocate new batch space. container.clear(); - // add fields from both batches for (VectorWrapper<?> w : left) { ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
