DRILL-712: Right side of Left join has zero values when should be null
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/86e565a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/86e565a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/86e565a0 Branch: refs/heads/master Commit: 86e565a0b4fe4111ec328bb06933678fa0133393 Parents: 85a2e04 Author: Steven Phillips <sphill...@maprtech.com> Authored: Tue May 13 20:01:55 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Thu May 15 09:55:03 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/common/types/Types.java | 4 ++ .../exec/physical/impl/join/HashJoinBatch.java | 29 ++++++++++-- .../exec/physical/impl/join/MergeJoinBatch.java | 50 +++++++++++++++----- 3 files changed, 66 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/common/src/main/java/org/apache/drill/common/types/Types.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java index a6ec29b..5d0812f 100644 --- a/common/src/main/java/org/apache/drill/common/types/Types.java +++ b/common/src/main/java/org/apache/drill/common/types/Types.java @@ -300,6 +300,10 @@ public class Types { } } + public static MajorType overrideMode(MajorType originalMajorType, DataMode overrideMode) { + return withScaleAndPrecision(originalMajorType.getMinorType(), overrideMode, originalMajorType.getScale(), originalMajorType.getPrecision()); + } + public static MajorType getMajorTypeFromName(String typeName) { return getMajorTypeFromName(typeName, DataMode.REQUIRED); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 72d0462..5eec3bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -20,6 +20,9 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.record.*; @@ -343,13 +346,21 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { if (hyperContainer != null) { for(VectorWrapper<?> vv : hyperContainer) { + MajorType inputType = vv.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + // Add the vector to our output container - ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator()); container.add(v); allocators.add(RemovingRecordBatch.getAllocator4(v)); JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); g.getEvalBlock()._if(outVV.invoke("copyFromSafe") .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) .arg(outIndex) @@ -372,12 +383,20 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { for (VectorWrapper<?> vv : left) { - ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + MajorType inputType = vv.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + + ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator()); container.add(v); allocators.add(RemovingRecordBatch.getAllocator4(v)); - JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), false, fieldId)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, outputFieldId)); + JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 9a055bf..48b7fea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -27,6 +27,8 @@ import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.TypedNullConstant; import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.MappingSet; @@ -43,12 +45,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; -import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.JoinRelType; @@ -336,10 +333,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { int vectorId = 0; if (status.isLeftPositionAllowed()) { for (VectorWrapper<?> vw : left) { + MajorType inputType = vw.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft", - new TypedFieldId(vw.getField().getType(), vectorId)); + new TypedFieldId(inputType, vectorId)); JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(vw.getField().getType(),vectorId)); + new TypedFieldId(outputType,vectorId)); // todo: check result of copyFromSafe and grow allocation cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") .arg(copyLeftMapping.getValueReadIndex()) @@ -359,10 +363,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { int rightVectorBase = vectorId; if (status.isRightPositionAllowed()) { for (VectorWrapper<?> vw : right) { + MajorType inputType = vw.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight", - new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase)); + new TypedFieldId(inputType, vectorId - rightVectorBase)); JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(vw.getField().getType(),vectorId)); + new TypedFieldId(outputType,vectorId)); // todo: check result of copyFromSafe and grow allocation cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") .arg(copyRightMappping.getValueReadIndex()) @@ -391,8 +402,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // add fields from both batches if (leftCount > 0) { + for (VectorWrapper<?> w : left) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator()); + MajorType inputType = w.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator()); VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); } @@ -400,7 +419,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { if (rightCount > 0) { for (VectorWrapper<?> w : right) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator()); + MajorType inputType = w.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator()); VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); }