DRILL-712: Right side of Left join has zero values when should be null

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/86e565a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/86e565a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/86e565a0

Branch: refs/heads/master
Commit: 86e565a0b4fe4111ec328bb06933678fa0133393
Parents: 85a2e04
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Tue May 13 20:01:55 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Thu May 15 09:55:03 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |  4 ++
 .../exec/physical/impl/join/HashJoinBatch.java  | 29 ++++++++++--
 .../exec/physical/impl/join/MergeJoinBatch.java | 50 +++++++++++++++-----
 3 files changed, 66 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java 
b/common/src/main/java/org/apache/drill/common/types/Types.java
index a6ec29b..5d0812f 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -300,6 +300,10 @@ public class Types {
     }
   }
 
+  public static MajorType overrideMode(MajorType originalMajorType, DataMode 
overrideMode) {
+    return withScaleAndPrecision(originalMajorType.getMinorType(), 
overrideMode, originalMajorType.getScale(), originalMajorType.getPrecision());
+  }
+
   public static MajorType getMajorTypeFromName(String typeName) {
     return getMajorTypeFromName(typeName, DataMode.REQUIRED);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 72d0462..5eec3bb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.physical.impl.join;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.record.*;
@@ -343,13 +346,21 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
         if (hyperContainer != null) {
             for(VectorWrapper<?> vv : hyperContainer) {
 
+                MajorType inputType = vv.getField().getType();
+                MajorType outputType;
+                if (joinType == JoinRelType.LEFT && inputType.getMode() == 
DataMode.REQUIRED) {
+                  outputType = Types.overrideMode(inputType, 
DataMode.OPTIONAL);
+                } else {
+                  outputType = inputType;
+                }
+
                 // Add the vector to our output container
-                ValueVector v = TypeHelper.getNewVector(vv.getField(), 
context.getAllocator());
+                ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), context.getAllocator());
                 container.add(v);
                 allocators.add(RemovingRecordBatch.getAllocator4(v));
 
                 JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", 
new TypedFieldId(vv.getField().getType(), true, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(vv.getField().getType(), false, fieldId));
+                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, fieldId));
                 g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
                   .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
                   .arg(outIndex)
@@ -372,12 +383,20 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
         if (leftUpstream == IterOutcome.OK || leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
             for (VectorWrapper<?> vv : left) {
 
-                ValueVector v = TypeHelper.getNewVector(vv.getField(), 
context.getAllocator());
+                MajorType inputType = vv.getField().getType();
+                MajorType outputType;
+                if (joinType == JoinRelType.RIGHT && inputType.getMode() == 
DataMode.REQUIRED) {
+                  outputType = Types.overrideMode(inputType, 
DataMode.OPTIONAL);
+                } else {
+                  outputType = inputType;
+                }
+
+                ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), oContext.getAllocator());
                 container.add(v);
                 allocators.add(RemovingRecordBatch.getAllocator4(v));
 
-                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", 
new TypedFieldId(vv.getField().getType(), false, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(vv.getField().getType(), false, outputFieldId));
+                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", 
new TypedFieldId(inputType, false, fieldId));
+                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, outputFieldId));
 
                 
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/86e565a0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 9a055bf..48b7fea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -27,6 +27,8 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.TypedNullConstant;
 import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.MappingSet;
@@ -43,12 +45,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
-import org.apache.drill.exec.record.AbstractRecordBatch;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.JoinRelType;
@@ -336,10 +333,17 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     int vectorId = 0;
     if (status.isLeftPositionAllowed()) {
       for (VectorWrapper<?> vw : left) {
+        MajorType inputType = vw.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.RIGHT && inputType.getMode() == 
DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
         JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
-                                                        new 
TypedFieldId(vw.getField().getType(), vectorId));
+                                                        new 
TypedFieldId(inputType, vectorId));
         JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                         new 
TypedFieldId(vw.getField().getType(),vectorId));
+                                                         new 
TypedFieldId(outputType,vectorId));
         // todo: check result of copyFromSafe and grow allocation
         cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
                                      .arg(copyLeftMapping.getValueReadIndex())
@@ -359,10 +363,17 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     int rightVectorBase = vectorId;
     if (status.isRightPositionAllowed()) {
       for (VectorWrapper<?> vw : right) {
+        MajorType inputType = vw.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.LEFT && inputType.getMode() == 
DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
         JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
-                                                        new 
TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
+                                                        new 
TypedFieldId(inputType, vectorId - rightVectorBase));
         JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                         new 
TypedFieldId(vw.getField().getType(),vectorId));
+                                                         new 
TypedFieldId(outputType,vectorId));
         // todo: check result of copyFromSafe and grow allocation
         cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
                                    .arg(copyRightMappping.getValueReadIndex())
@@ -391,8 +402,16 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     
     // add fields from both batches
     if (leftCount > 0) {
+
       for (VectorWrapper<?> w : left) {
-        ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
oContext.getAllocator());
+        MajorType inputType = w.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.RIGHT && inputType.getMode() == 
DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
+        ValueVector outgoingVector = 
TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), 
outputType), oContext.getAllocator());
         VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
left.getRecordCount())).alloc(joinBatchSize);
         container.add(outgoingVector);
       }
@@ -400,7 +419,14 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
 
     if (rightCount > 0) {
       for (VectorWrapper<?> w : right) {
-        ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
oContext.getAllocator());
+        MajorType inputType = w.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.LEFT && inputType.getMode() == 
DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
+        ValueVector outgoingVector = 
TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), 
outputType), oContext.getAllocator());
         VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
right.getRecordCount())).alloc(joinBatchSize);
         container.add(outgoingVector);
       }

Reply via email to