[ 
https://issues.apache.org/jira/browse/DRILL-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498579#comment-16498579
 ] 

ASF GitHub Bot commented on DRILL-3991:
---------------------------------------

ilooner closed pull request #299: DRILL-3991 Initial patch to support schema 
changes in hash join.
URL: https://github.com/apache/drill/pull/299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index daac31d43e..225fbc5100 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -101,9 +101,16 @@ public static LogicalExpression 
materialize(LogicalExpression expr, VectorAccess
     return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, 
functionLookupContext, false, false);
   }
 
-  public static LogicalExpression materializeAndCheckErrors(LogicalExpression 
expr, VectorAccessible batch, FunctionLookupContext functionLookupContext) 
throws SchemaChangeException {
+  public static LogicalExpression materializeAndCheckErrors(LogicalExpression 
expr, VectorAccessible batch,
+                                                            
FunctionLookupContext functionLookupContext) throws SchemaChangeException {
+    return materializeAndCheckErrors(expr, batch, functionLookupContext, 
false, false);
+  }
+
+  public static LogicalExpression materializeAndCheckErrors(LogicalExpression 
expr, VectorAccessible batch,
+                                                            
FunctionLookupContext functionLookupContext,
+                                                            boolean 
allowComplexWriterExpr, boolean unionTypeEnabled) throws SchemaChangeException {
     ErrorCollector collector = new ErrorCollectorImpl();
-    LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, 
collector, functionLookupContext, false, false);
+    LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, 
collector, functionLookupContext, allowComplexWriterExpr, unionTypeEnabled);
     if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 8af15082df..25a4b3f63e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -269,7 +269,7 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
 
     ChainedHashTable ht =
         new ChainedHashTable(htConfig, context, allocator, incoming, null /* 
no incoming probe */, outgoing,
-            true /* nulls are equal */);
+            true /* nulls are equal */, false);
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
 
     numGroupByOutFields = groupByOutFieldIds.length;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 08ccaa1d53..45321be096 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -119,15 +119,15 @@
   private HashTableConfig htConfig;
   private final FragmentContext context;
   private final BufferAllocator allocator;
-  private final RecordBatch incomingBuild;
-  private final RecordBatch incomingProbe;
+  private final VectorAccessible incomingBuild;
+  private final VectorAccessible incomingProbe;
   private final RecordBatch outgoing;
   private final boolean areNullsEqual;
+  private final boolean unionTypeEnabled;
 
   public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, 
BufferAllocator allocator,
-                          RecordBatch incomingBuild, RecordBatch 
incomingProbe, RecordBatch outgoing,
-                          boolean areNullsEqual) {
-
+                          VectorAccessible incomingBuild, VectorAccessible 
incomingProbe, RecordBatch outgoing,
+                          boolean areNullsEqual, boolean unionTypeEnabled) {
     this.htConfig = htConfig;
     this.context = context;
     this.allocator = allocator;
@@ -135,6 +135,7 @@ public ChainedHashTable(HashTableConfig htConfig, 
FragmentContext context, Buffe
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
     this.areNullsEqual = areNullsEqual;
+    this.unionTypeEnabled = unionTypeEnabled;
   }
 
   public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) 
throws ClassTransformationException,
@@ -157,7 +158,8 @@ public HashTable createAndSetupHashTable(TypedFieldId[] 
outKeyFieldIds) throws C
 
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, 
context.getFunctionRegistry());
+      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild,
+        collector, context.getFunctionRegistry(), /*allowComplexWriterExpr*/ 
false, unionTypeEnabled);
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
       }
@@ -171,7 +173,8 @@ public HashTable createAndSetupHashTable(TypedFieldId[] 
outKeyFieldIds) throws C
     if (isProbe) {
       i = 0;
       for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
-        final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, 
context.getFunctionRegistry());
+        final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe,
+          collector, context.getFunctionRegistry(), /*allowComplexWriterExpr*/ 
false, unionTypeEnabled);
         if (collector.hasErrors()) {
           throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
         }
@@ -181,7 +184,7 @@ public HashTable createAndSetupHashTable(TypedFieldId[] 
outKeyFieldIds) throws C
         keyExprsProbe[i] = expr;
         i++;
       }
-      JoinUtils.addLeastRestrictiveCasts(keyExprsProbe, incomingProbe, 
keyExprsBuild, incomingBuild, context);
+      JoinUtils.addLeastRestrictiveCasts(keyExprsProbe, incomingProbe, 
keyExprsBuild, incomingBuild, context, unionTypeEnabled);
     }
 
     i = 0;
@@ -316,10 +319,9 @@ private void setupGetHash(ClassGenerator<HashTable> cg, 
MappingSet incomingMappi
      */
     LogicalExpression hashExpression = 
HashPrelUtil.getHashExpression(Arrays.asList(keyExprs),
         incomingProbe != null ? true : false);
-    final LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materializeAndCheckErrors(hashExpression, batch, 
context.getFunctionRegistry());
+    final LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materializeAndCheckErrors(hashExpression, batch, 
context.getFunctionRegistry(),
+      /*allowComplexWriterExpr */ false, unionTypeEnabled);
     HoldingContainer hash = cg.addExpr(materializedExpr);
     cg.getEvalBlock()._return(hash.getValue());
-
-
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ef7dadfc72..c3c9681eb9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -21,6 +21,7 @@
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 
 public interface HashTable {
@@ -55,7 +56,7 @@
   public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
 
   public void setup(HashTableConfig htConfig, FragmentContext context, 
BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
+      VectorAccessible incomingBuild, VectorAccessible incomingProbe,
       RecordBatch outgoing, VectorContainer htContainerOrig);
 
   public void updateBatches();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index e0876af792..6b9b5cdf43 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -33,6 +33,7 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
@@ -78,10 +79,10 @@
   private BufferAllocator allocator;
 
   // The incoming build side record batch
-  private RecordBatch incomingBuild;
+  private VectorAccessible incomingBuild;
 
   // The incoming probe side record batch (may be null)
-  private RecordBatch incomingProbe;
+  private VectorAccessible incomingProbe;
 
   // The outgoing record batch
   private RecordBatch outgoing;
@@ -401,8 +402,8 @@ private ValueVector getValueVector(int index) {
 
     @RuntimeOverridden
     protected void setupInterior(
-        @Named("incomingBuild") RecordBatch incomingBuild,
-        @Named("incomingProbe") RecordBatch incomingProbe,
+        @Named("incomingBuild") VectorAccessible incomingBuild,
+        @Named("incomingProbe") VectorAccessible incomingProbe,
         @Named("outgoing") RecordBatch outgoing,
         @Named("htContainer") VectorContainer htContainer) {
     }
@@ -432,7 +433,7 @@ protected void outputRecordKeys(@Named("htRowIdx") int 
htRowIdx, @Named("outRowI
 
   @Override
   public void setup(HashTableConfig htConfig, FragmentContext context, 
BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
+      VectorAccessible incomingBuild, VectorAccessible incomingProbe,
       RecordBatch outgoing, VectorContainer htContainerOrig) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
@@ -769,7 +770,7 @@ public void addNewKeyBatch() {
   }
 
   // These methods will be code-generated in the context of the outer class
-  protected abstract void doSetup(@Named("incomingBuild") RecordBatch 
incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
+  protected abstract void doSetup(@Named("incomingBuild") VectorAccessible 
incomingBuild, @Named("incomingProbe") VectorAccessible incomingProbe);
 
   protected abstract int getHashBuild(@Named("incomingRowIdx") int 
incomingRowIdx);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3ea97c6581..9ef8e92273 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
@@ -51,7 +52,9 @@
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
@@ -106,43 +109,49 @@
   // Schema of the build side
   private BatchSchema rightSchema = null;
 
+  // Schema of probe side
+  private BatchSchema leftSchema = null;
 
   // Generator mapping for the build side
-  // Generator mapping for the build side : scalar
   private static final GeneratorMapping PROJECT_BUILD =
-      GeneratorMapping.create("doSetup"/* setup method */, 
"projectBuildRecord" /* eval method */, null /* reset */,
-          null /* cleanup */);
+    GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" 
/* eval method */, null /* reset */,
+      null /* cleanup */);
   // Generator mapping for the build side : constant
   private static final GeneratorMapping PROJECT_BUILD_CONSTANT = 
GeneratorMapping.create("doSetup"/* setup method */,
-      "doSetup" /* eval method */,
-      null /* reset */, null /* cleanup */);
+    "doSetup" /* eval method */,
+    null /* reset */, null /* cleanup */);
 
   // Generator mapping for the probe side : scalar
   private static final GeneratorMapping PROJECT_PROBE =
-      GeneratorMapping.create("doSetup" /* setup method */, 
"projectProbeRecord" /* eval method */, null /* reset */,
-          null /* cleanup */);
+    GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" 
/* eval method */, null /* reset */,
+      null /* cleanup */);
   // Generator mapping for the probe side : constant
-  private static final GeneratorMapping PROJECT_PROBE_CONSTANT = 
GeneratorMapping.create("doSetup" /* setup method */,
-      "doSetup" /* eval method */,
-      null /* reset */, null /* cleanup */);
 
+  private static final GeneratorMapping PROJECT_PROBE_CONSTANT = 
GeneratorMapping.create("doSetup" /* setup method */,
+    "doSetup" /* eval method */,
+    null /* reset */, null /* cleanup */);
 
   // Mapping set for the build side
   private final MappingSet projectBuildMapping =
-      new MappingSet("buildIndex" /* read index */, "outIndex" /* write index 
*/, "buildBatch" /* read container */,
-          "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, 
PROJECT_BUILD);
+    new MappingSet("buildIndex" /* read index */, "outIndex" /* write index 
*/, "buildBatch" /* read container */,
+      "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD);
 
   // Mapping set for the probe side
   private final MappingSet projectProbeMapping = new MappingSet("probeIndex" 
/* read index */, "outIndex" /* write index */,
-      "probeBatch" /* read container */,
-      "outgoing" /* write container */,
-      PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+    "probeBatch" /* read container */,
+    "outgoing" /* write container */,
+    PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+
+  private IterOutcome leftUpstream = IterOutcome.NONE;
+  private IterOutcome rightUpstream = IterOutcome.NONE;
 
-  // indicates if we have previously returned an output batch
-  boolean firstOutputBatch = true;
+  private boolean probeSideSchemaChanged = false;
+  private boolean outputSchemaChanged = false;
 
-  IterOutcome leftUpstream = IterOutcome.NONE;
-  IterOutcome rightUpstream = IterOutcome.NONE;
+  private VectorContainer finalBuildSideVectorContainer;
+  private List<VectorContainer> buildBatchContainers = new ArrayList<>();
+
+  final private HashJoinProbeStatus probeStatus = new HashJoinProbeStatus();
 
   private final HashTableStats htStats = new HashTableStats();
 
@@ -161,6 +170,15 @@ public int metricId() {
     }
   }
 
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, 
RecordBatch left,
+                       RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, true);
+    this.left = left;
+    this.right = right;
+    joinType = popConfig.getJoinType();
+    conditions = popConfig.getConditions();
+  }
+
   @Override
   public int getRecordCount() {
     return outputRecords;
@@ -180,8 +198,6 @@ protected void buildSchema() throws SchemaChangeException {
       state = BatchState.OUT_OF_MEMORY;
       return;
     }
-
-    // Initialize the hash join helper context
     hjHelper = new HashJoinHelper(context, oContext.getAllocator());
     try {
       rightSchema = right.getSchema();
@@ -191,11 +207,12 @@ protected void buildSchema() throws SchemaChangeException 
{
       }
       vectors.buildSchema(SelectionVectorMode.NONE);
       vectors.setRecordCount(0);
+      // Initialize hyper container here in case build side is empty.
       hyperContainer = new ExpandableHyperContainer(vectors);
       hjHelper.addNewBatch(0);
       buildBatchIndex++;
-      setupHashTable();
-      hashJoinProbe = setupHashJoinProbe();
+      hashTable = setupHashTable(this.left, this.right);
+      hashJoinProbe = setupHashJoinProbe(left);
       // Build the container schema and set the counts
       for (final VectorWrapper<?> w : container) {
         w.getValueVector().allocateNew();
@@ -215,38 +232,59 @@ public IterOutcome innerNext() {
        */
       if (state == BatchState.FIRST) {
         // Build the hash table, using the build side record batches.
-        executeBuildPhase();
-        //                IterOutcome next = next(HashJoinHelper.LEFT_INPUT, 
left);
-        hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 
left.getRecordCount(), this, hashTable,
-            hjHelper, joinType);
+        if (!unionTypeEnabled) {
+          executeBuildPhase();
+          hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 
left.getRecordCount(), this, hashTable,
+            hjHelper, joinType, probeStatus, false, oContext, null, null);
+        } else {
+          executeBuildPhaseWithSchemaChanges();
+          container.clear();
+          leftSchema = left.getSchema();
+          hashJoinProbe = setupHashJoinProbe(left);
+          hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 
left.getRecordCount(), this, hashTable,
+            hjHelper, joinType, probeStatus, true, oContext, null, null);
 
+        }
         // Update the hash table related stats for the operator
         updateStats(this.hashTable);
       }
 
-      // Store the number of records projected
       if (!hashTable.isEmpty() || joinType != JoinRelType.INNER) {
-
-        // Allocate the memory for the vectors in the output container
-        allocateVectors();
-
-        outputRecords = hashJoinProbe.probeAndProject();
-
-        /* We are here because of one the following
-         * 1. Completed processing of all the records and we are done
-         * 2. We've filled up the outgoing batch to the maximum and we need to 
return upstream
-         * Either case build the output container's schema and return
-         */
-        if (outputRecords > 0 || state == BatchState.FIRST) {
-          if (state == BatchState.FIRST) {
-            state = BatchState.NOT_FIRST;
+        while (true) {
+          if (probeSideSchemaChanged) {
+            resetHashJoinProbe();
+            probeSideSchemaChanged = false;
+            outputSchemaChanged = true;
           }
-
-          for (final VectorWrapper<?> v : container) {
-            v.getValueVector().getMutator().setValueCount(outputRecords);
+          // Allocate the memory for the vectors in the output container
+          allocateVectors();
+          outputRecords = hashJoinProbe.probeAndProject();
+          if (outputRecords == 0) {
+            if (hashJoinProbe.schemaChanged()) {
+              probeSideSchemaChanged = true;
+              continue;
+            } else {
+              // No more record to output, return NONE.
+              break;
+            }
+          } else {
+            boolean first = false;
+            if (state == BatchState.FIRST) {
+              state = BatchState.NOT_FIRST;
+              first = true;
+            }
+            for (final VectorWrapper<?> v : container) {
+              v.getValueVector().getMutator().setValueCount(outputRecords);
+            }
+            container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+            container.setRecordCount(outputRecords);
+            // Remember to reset hash probe and table on next call.
+            probeSideSchemaChanged = hashJoinProbe.schemaChanged();
+            final IterOutcome result = outputSchemaChanged ? 
IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+            // This is the first batch after schema change, reset 
outputSchemaChanged until next schema change.
+            outputSchemaChanged = false;
+            return result;
           }
-
-          return IterOutcome.OK;
         }
       } else {
         // Our build side is empty, we won't have any matches, clear the probe 
side
@@ -264,12 +302,8 @@ public IterOutcome innerNext() {
           }
         }
       }
-
       // No more output records, clean up and return
       state = BatchState.DONE;
-      //            if (first) {
-      //              return IterOutcome.OK_NEW_SCHEMA;
-      //            }
       return IterOutcome.NONE;
     } catch (ClassTransformationException | SchemaChangeException | 
IOException e) {
       context.fail(e);
@@ -278,7 +312,7 @@ public IterOutcome innerNext() {
     }
   }
 
-  public void setupHashTable() throws IOException, SchemaChangeException, 
ClassTransformationException {
+  public HashTable setupHashTable(VectorAccessible probeBatch, 
VectorAccessible buildBatch) throws IOException, SchemaChangeException, 
ClassTransformationException {
     // Setup the hash table configuration object
     int conditionsSize = conditions.size();
     final NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
@@ -301,20 +335,152 @@ public void setupHashTable() throws IOException, 
SchemaChangeException, ClassTra
     if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != 
IterOutcome.OK) {
       leftExpr = null;
     } else {
-      if (left.getSchema().getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
+      if (probeBatch.getSchema().getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
         throw new SchemaChangeException("Hash join does not support probe 
batch with selection vectors");
       }
     }
 
     final HashTableConfig htConfig =
-        new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
-            HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+      new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
+        HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
     // Create the chained hash table
     final ChainedHashTable ht =
-        new ChainedHashTable(htConfig, context, oContext.getAllocator(), 
this.right, this.left, null,
-            areNullsEqual);
-    hashTable = ht.createAndSetupHashTable(null);
+      new ChainedHashTable(htConfig, context, oContext.getAllocator(), 
buildBatch, probeBatch, null,
+        areNullsEqual, unionTypeEnabled);
+    return ht.createAndSetupHashTable(null);
+  }
+
+  /**
+   * Probe side has changes schema.
+   * 1) From now on always coerce incoming vectors to new schema (until next 
schema change)
+   * 2) Recreate probe side and hash table for new probe side schema.
+   * 3) Add build side keys to new hashtable.
+   * 4) Remember already marked keys from build side from previous operations.
+   * 5) Don't need to rebuild hypercontainer.
+   * @throws IOException
+   * @throws ClassTransformationException
+   */
+  private void resetHashJoinProbe() throws IOException, 
ClassTransformationException, SchemaChangeException {
+    container.clear();
+    assert leftSchema != null;
+    assert unionTypeEnabled;
+    assert hjHelper != null;
+
+    leftSchema = SchemaUtil.mergeSchemas(leftSchema, left.getSchema());
+
+    final VectorContainer probeContainer = SchemaUtil.coerceContainer(left, 
leftSchema, oContext);
+
+    // Throw away old hash table.
+    hashTable.clear();
+    final HashJoinHelper newHjHelper = new HashJoinHelper(context, 
oContext.getAllocator());
+    hashTable = setupHashTable(probeContainer, finalBuildSideVectorContainer);
+
+    buildBatchIndex = 0;
+    for (VectorContainer batchVectorContainer: buildBatchContainers) {
+      final int currentRecordCount = batchVectorContainer.getRecordCount();
+      batchVectorContainer.transferOut(finalBuildSideVectorContainer);
+      
finalBuildSideVectorContainer.setRecordCount(batchVectorContainer.getRecordCount());
+      newHjHelper.addNewBatch(currentRecordCount);
+      final IndexPointer htIndex = new IndexPointer();
+      // For every record in the build batch , hash the key columns
+      for (int i = 0; i < currentRecordCount; i++) {
+        hashTable.put(i, htIndex, 1 /* retry count */);
+        newHjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+        if (hjHelper.isMatched(buildBatchIndex, i)) {
+          newHjHelper.setRecordMatched(buildBatchIndex, i);
+        }
+      }
+      finalBuildSideVectorContainer.transferOut(batchVectorContainer);
+      buildBatchIndex++;
+    }
+    hjHelper.clear();
+    hjHelper = newHjHelper;
+
+    hashJoinProbe = setupHashJoinProbe(probeContainer);
+    hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 
left.getRecordCount(), this, hashTable,
+      hjHelper, joinType, probeStatus, true, oContext, leftSchema, 
probeContainer);
+  }
+
+  public void executeBuildPhaseWithSchemaChanges() throws 
ClassTransformationException, IOException, SchemaChangeException {
+    boolean moreData = true;
+    final List<RecordBatchData> buildBatches = new ArrayList<>();
+    while (moreData) {
+      switch (rightUpstream) {
+        case OUT_OF_MEMORY:
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          moreData = false;
+          continue;
+
+        case OK:
+        case OK_NEW_SCHEMA:
+          rightSchema = SchemaUtil.mergeSchemas(rightSchema, 
right.getSchema());
+          // Skip empty batch.
+          if (right.getRecordCount() == 0) {
+            for (final VectorWrapper<?> w : right) {
+              w.clear();
+            }
+          } else {
+            // Copy the record batch for future and remember schema in this 
batch.
+            buildBatches.add(new RecordBatchData(right, 
oContext.getAllocator()));
+          }
+          rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
+          continue;
+      }
+    }
+    if (buildBatches.size() == 0) {
+      return;
+    }
+    // Ignore first batch from buildSchema().
+    buildBatchIndex = 0;
+    // Setup finalBuildSideVectorContainer with empty vectors and wrappers.
+    finalBuildSideVectorContainer = SchemaUtil.coerceContainer(null, 
rightSchema, oContext);
+    // Create hash table with current left and finalBuildSideVectorContainer.
+    rightSchema = finalBuildSideVectorContainer.getSchema();
+    if (hashTable != null) {
+      hashTable.clear();
+    }
+    hashTable = setupHashTable(this.left, finalBuildSideVectorContainer);
+    if (hyperContainer != null) {
+      hyperContainer.clear();
+      hyperContainer = null;
+    }
+    if (hjHelper != null) {
+      hjHelper.clear();
+    }
+    hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+    buildBatchIndex = 0;
+    for (RecordBatchData batchData : buildBatches) {
+      final int currentRecordCount = batchData.getRecordCount();
+      final VectorContainer batchVectorContainer = 
SchemaUtil.coerceContainer(batchData.getContainer(), rightSchema, oContext);
+      // transfer from batch container to container accessed by hash table.
+      batchVectorContainer.transferOut(finalBuildSideVectorContainer);
+      
finalBuildSideVectorContainer.setRecordCount(batchVectorContainer.getRecordCount());
+      hjHelper.addNewBatch(currentRecordCount);
+      // Holder contains the global index where the key is hashed into using 
the hash table
+      final IndexPointer htIndex = new IndexPointer();
+      // For every record in the build batch , hash the key columns
+      for (int i = 0; i < currentRecordCount; i++) {
+        hashTable.put(i, htIndex, 1 /* retry count */);
+        /* Use the global index returned by the hash table, to store
+         * the current record index and batch index. This will be used
+         * later when we probe and find a match.
+         */
+        hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+      }
+      // Transfer back after hashing.
+      finalBuildSideVectorContainer.transferOut(batchVectorContainer);
+      if (hyperContainer == null) {
+        hyperContainer = new ExpandableHyperContainer(batchVectorContainer);
+      } else {
+        hyperContainer.addBatch(batchVectorContainer);
+      }
+      // Remember coerced container for this batch.
+      this.buildBatchContainers.add(buildBatchIndex, batchVectorContainer);
+      buildBatchIndex++;
+    }
   }
 
   public void executeBuildPhase() throws SchemaChangeException, 
ClassTransformationException, IOException {
@@ -332,79 +498,77 @@ public void executeBuildPhase() throws 
SchemaChangeException, ClassTransformatio
 
     while (moreData) {
       switch (rightUpstream) {
-      case OUT_OF_MEMORY:
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        moreData = false;
-        continue;
-
-      case OK_NEW_SCHEMA:
-        if (rightSchema == null) {
-          rightSchema = right.getSchema();
-
-          if (rightSchema.getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
-            throw new SchemaChangeException("Hash join does not support build 
batch with selection vectors");
-          }
-          setupHashTable();
-        } else {
-          if (!rightSchema.equals(right.getSchema())) {
-            throw new SchemaChangeException("Hash join does not support schema 
changes");
+        case OUT_OF_MEMORY:
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          moreData = false;
+          continue;
+
+        case OK_NEW_SCHEMA:
+          if (rightSchema == null) {
+            rightSchema = right.getSchema();
+
+            if (rightSchema.getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
+              throw new SchemaChangeException("Hash join does not support 
build batch with selection vectors");
+            }
+            hashTable = setupHashTable(this.left, this.right);
+          } else {
+            if (!rightSchema.equals(right.getSchema())) {
+              throw new SchemaChangeException("Hash join does not support 
schema changes");
+            }
+            hashTable.updateBatches();
           }
-          hashTable.updateBatches();
-        }
-        // Fall through
-      case OK:
-        final int currentRecordCount = right.getRecordCount();
+          // Fall through
+        case OK:
+          final int currentRecordCount = right.getRecordCount();
 
                     /* For every new build batch, we store some state in the 
helper context
                      * Add new state to the helper context
                      */
-        hjHelper.addNewBatch(currentRecordCount);
-
-        // Holder contains the global index where the key is hashed into using 
the hash table
-        final IndexPointer htIndex = new IndexPointer();
-
-        // For every record in the build batch , hash the key columns
-        for (int i = 0; i < currentRecordCount; i++) {
-          hashTable.put(i, htIndex, 1 /* retry count */);
-
-                        /* Use the global index returned by the hash table, to 
store
-                         * the current record index and batch index. This will 
be used
-                         * later when we probe and find a match.
-                         */
-          hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-        }
-
-                    /* Completed hashing all records in this batch. Transfer 
the batch
-                     * to the hyper vector container. Will be used when we 
want to retrieve
-                     * records that have matching keys on the probe side.
-                     */
-        final RecordBatchData nextBatch = new RecordBatchData(right, 
oContext.getAllocator());
-        boolean success = false;
-        try {
-          if (hyperContainer == null) {
-            hyperContainer = new 
ExpandableHyperContainer(nextBatch.getContainer());
-          } else {
-            hyperContainer.addBatch(nextBatch.getContainer());
+          hjHelper.addNewBatch(currentRecordCount);
+
+          // Holder contains the global index where the key is hashed into 
using the hash table
+          final IndexPointer htIndex = new IndexPointer();
+
+          // For every record in the build batch , hash the key columns
+          for (int i = 0; i < currentRecordCount; i++) {
+            hashTable.put(i, htIndex, 1 /* retry count */);
+            /* Use the global index returned by the hash table, to store
+             * the current record index and batch index. This will be used
+             * later when we probe and find a match.
+             */
+            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
           }
+          /* Completed hashing all records in this batch. Transfer the batch
+           * to the hyper vector container. Will be used when we want to 
retrieve
+           * records that have matching keys on the probe side.
+           */
+          final RecordBatchData nextBatch = new RecordBatchData(right, 
oContext.getAllocator());
+          boolean success = false;
+          try {
+            if (hyperContainer == null) {
+              hyperContainer = new 
ExpandableHyperContainer(nextBatch.getContainer());
+            } else {
+              hyperContainer.addBatch(nextBatch.getContainer());
+            }
 
-          // completed processing a batch, increment batch index
-          buildBatchIndex++;
-          success = true;
-        } finally {
-          if (!success) {
-            nextBatch.clear();
+            // completed processing a batch, increment batch index
+            buildBatchIndex++;
+            success = true;
+          } finally {
+            if (!success) {
+              nextBatch.clear();
+            }
           }
-        }
-        break;
+          break;
       }
       // Get the next record batch
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
     }
   }
 
-  public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, IOException {
+  public HashJoinProbe setupHashJoinProbe(VectorAccessible probe) throws 
ClassTransformationException, IOException {
     final CodeGenerator<HashJoinProbe> cg = 
CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
     final ClassGenerator<HashJoinProbe> g = cg.getRoot();
 
@@ -423,7 +587,7 @@ public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, I
         // If left or full outer join, then the output type must be nullable. 
However, map types are
         // not nullable so we must exclude them from the check below (see 
DRILL-2197).
         if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && 
inputType.getMode() == DataMode.REQUIRED
-            && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
+          && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
           outputType = inputType;
@@ -437,9 +601,9 @@ public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, I
         final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new 
TypedFieldId(field.getType(), true, fieldId));
         final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(outputType, false, fieldId));
         g.getEvalBlock().add(outVV.invoke("copyFromSafe")
-            .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-            .arg(outIndex)
-            .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
+          .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+          .arg(outIndex)
+          .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
 
         fieldId++;
       }
@@ -452,15 +616,15 @@ public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, I
     fieldId = 0;
     final JExpression probeIndex = JExpr.direct("probeIndex");
 
-    if (leftUpstream == IterOutcome.OK || leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
-      for (final VectorWrapper<?> vv : left) {
+    if (leftUpstream == IterOutcome.OK || leftUpstream == 
IterOutcome.OK_NEW_SCHEMA || probeSideSchemaChanged) {
+      for (final VectorWrapper<?> vv : probe) {
         final MajorType inputType = vv.getField().getType();
         final MajorType outputType;
 
         // If right or full outer join then the output type should be 
optional. However, map types are
         // not nullable so we must exclude them from the check below (see 
DRILL-2771, DRILL-2197).
         if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && 
inputType.getMode() == DataMode.REQUIRED
-            && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
+          && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
           outputType = inputType;
@@ -492,15 +656,6 @@ private void allocateVectors() {
     }
   }
 
-  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, 
RecordBatch left,
-      RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context, true);
-    this.left = left;
-    this.right = right;
-    joinType = popConfig.getJoinType();
-    conditions = popConfig.getConditions();
-  }
-
   private void updateStats(HashTable htable) {
     if (htable == null) {
       return;
@@ -523,12 +678,18 @@ public void close() {
     if (hjHelper != null) {
       hjHelper.clear();
     }
-
+    if (finalBuildSideVectorContainer != null) {
+      finalBuildSideVectorContainer.clear();
+    }
     // If we didn't receive any data, hyperContainer may be null, check before 
clearing
     if (hyperContainer != null) {
       hyperContainer.clear();
     }
 
+    for (VectorContainer vc : buildBatchContainers) {
+      vc.clear();
+    }
+
     if (hashTable != null) {
       hashTable.clear();
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index a3c33ed241..b427ab30e1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -165,13 +165,25 @@ public int getNextIndex(int currentIdx) {
   }
 
   public void setRecordMatched(int index) {
-    int batchIdx  = index >>> SHIFT_SIZE;
-    int recordIdx = index & HashTable.BATCH_MASK;
+    final int batchIdx  = index >>> SHIFT_SIZE;
+    final int recordIdx = index & HashTable.BATCH_MASK;
 
     // Get the BitVector for the appropriate batch and set the bit to indicate 
the record matched
-    BuildInfo info = buildInfoList.get(batchIdx);
-    BitSet bitVector = info.getKeyMatchBitVector();
+    final BuildInfo info = buildInfoList.get(batchIdx);
+    final BitSet bitVector = info.getKeyMatchBitVector();
+
+    bitVector.set(recordIdx);
+  }
+
+  public boolean isMatched(int batchIdx, int recordIdx) {
+    final BuildInfo info = buildInfoList.get(batchIdx);
+    final BitSet bitVector = info.getKeyMatchBitVector();
+    return bitVector.get(recordIdx);
+  }
 
+  public void setRecordMatched(int batchIdx, int recordIdx) {
+    final BuildInfo info = buildInfoList.get(batchIdx);
+    final BitSet bitVector = info.getKeyMatchBitVector();
     bitVector.set(recordIdx);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index cc6bd5578a..e7755b3c5f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -24,8 +24,11 @@
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.calcite.rel.core.JoinRelType;
 
@@ -46,9 +49,11 @@
 
   public abstract void setupHashJoinProbe(FragmentContext context, 
VectorContainer buildBatch, RecordBatch probeBatch,
                                           int probeRecordCount, HashJoinBatch 
outgoing, HashTable hashTable, HashJoinHelper hjHelper,
-                                          JoinRelType joinRelType);
-  public abstract void doSetup(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+                                          JoinRelType joinRelType, 
HashJoinProbeStatus probeStatus, boolean unionTypeEnabled,
+                                          OperatorContext oContext, 
BatchSchema coercedSchema, VectorContainer coercedContainer);
+  public abstract void doSetup(FragmentContext context, VectorAccessible 
buildBatch, VectorAccessible probeBatch, RecordBatch outgoing);
   public abstract int  probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException;
   public abstract void projectBuildRecord(int buildIndex, int outIndex);
   public abstract void projectProbeRecord(int probeIndex, int outIndex);
+  public abstract boolean schemaChanged();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeStatus.java
new file mode 100644
index 0000000000..9629eef461
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeStatus.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import java.util.List;
+
+/*
+ * Keep status of hash join proble phase.
+ */
+public class HashJoinProbeStatus {
+  // Number of records to process on the probe side
+  public int recordsToProcess = 0;
+  // Number of records processed on the probe side
+  public int recordsProcessed = 0;
+
+  // Indicate if we should drain the next record from the probe side
+  public boolean getNextRecord = true;
+
+  // Contains both batch idx and record idx of the matching record in the 
build side
+  public int currentCompositeIdx = -1;
+
+  // Current state the hash join algorithm is in
+  public HashJoinProbe.ProbeState probeState = 
HashJoinProbe.ProbeState.PROBE_PROJECT;
+
+  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
+  public List<Integer> unmatchedBuildIndexes = null;
+
+  public String toString() {
+    return String.format("recordsToProcess: %d, recordsProcessed: %d, 
getNextRecord: %s, currentCompositeIdx: %d, probeState: %s",
+      recordsToProcess, recordsProcessed, getNextRecord, currentCompositeIdx, 
probeState);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 5531bc7e29..8be8498b08 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -18,17 +18,19 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
-import java.util.List;
 
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -59,76 +61,86 @@
   // Underlying hashtable used by the hash join
   private HashTable hashTable = null;
 
-  // Number of records to process on the probe side
-  private int recordsToProcess = 0;
-
-  // Number of records processed on the probe side
-  private int recordsProcessed = 0;
-
   // Number of records in the output container
   private int outputRecords;
 
-  // Indicate if we should drain the next record from the probe side
-  private boolean getNextRecord = true;
+  private boolean unionTypeEnabled;
 
-  // Contains both batch idx and record idx of the matching record in the 
build side
-  private int currentCompositeIdx = -1;
+  // If schema changes during probe phase stop probe phase.
+  private  boolean schemaChanged = false;
 
-  // Current state the hash join algorithm is in
-  private ProbeState probeState = ProbeState.PROBE_PROJECT;
+  private OperatorContext oContext;
 
-  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
-  private List<Integer> unmatchedBuildIndexes = null;
+  // to hold coerced left side schema and container.
+  private VectorContainer coercedContainer = null;
+  private BatchSchema coercedSchema = null;
+
+  private HashJoinProbeStatus probeStatus;
 
   @Override
   public void setupHashJoinProbe(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch,
                                  int probeRecordCount, HashJoinBatch outgoing, 
HashTable hashTable,
-                                 HashJoinHelper hjHelper, JoinRelType 
joinRelType) {
+                                 HashJoinHelper hjHelper, JoinRelType 
joinRelType,
+                                 HashJoinProbeStatus probeStatus,
+                                 boolean unionTypeEnabled, OperatorContext 
oContext,
+                                 BatchSchema coercedSchema, VectorContainer 
coercedContainer) {
 
     this.probeBatch = probeBatch;
     this.probeSchema = probeBatch.getSchema();
     this.buildBatch = buildBatch;
     this.joinType = joinRelType;
-    this.recordsToProcess = probeRecordCount;
     this.hashTable = hashTable;
     this.hjHelper = hjHelper;
     this.outgoingJoinBatch = outgoing;
-
-    doSetup(context, buildBatch, probeBatch, outgoing);
+    this.unionTypeEnabled = unionTypeEnabled;
+    this.coercedSchema = coercedSchema;
+    this.coercedContainer = coercedContainer;
+    this.oContext = oContext;
+    this.probeStatus = probeStatus;
+    this.probeStatus.recordsToProcess = probeRecordCount;
+    this.probeStatus.recordsProcessed = 0;
+    if (coercedContainer == null) {
+      doSetup(context, (VectorAccessible)buildBatch, 
(VectorAccessible)probeBatch, outgoing);
+    } else {
+      doSetup(context, (VectorAccessible)buildBatch, 
(VectorAccessible)coercedContainer, outgoing);
+    }
   }
 
   public void executeProjectRightPhase() {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
-      projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), 
outputRecords);
-      recordsProcessed++;
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && 
probeStatus.recordsProcessed < probeStatus.recordsToProcess) {
+      
projectBuildRecord(probeStatus.unmatchedBuildIndexes.get(probeStatus.recordsProcessed),
 outputRecords);
+      probeStatus.recordsProcessed++;
       outputRecords++;
     }
   }
 
-  public void executeProbePhase() throws SchemaChangeException {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+  public boolean schemaChanged() {
+    return schemaChanged;
+  }
 
+  public void executeProbePhase() throws SchemaChangeException {
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeStatus.probeState 
!= ProbeState.DONE && probeStatus.probeState != ProbeState.PROJECT_RIGHT) {
       // Check if we have processed all records in this batch we need to 
invoke next
-      if (recordsProcessed == recordsToProcess) {
-
+      if (probeStatus.recordsProcessed == probeStatus.recordsToProcess) {
         // Done processing all records in the previous batch, clean up!
         for (VectorWrapper<?> wrapper : probeBatch) {
           wrapper.getValueVector().clear();
         }
-
-        IterOutcome leftUpstream = 
outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
+        if (coercedContainer != null) {
+          coercedContainer.zeroVectors();
+        }
+        final IterOutcome leftUpstream = 
outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
         switch (leftUpstream) {
           case NONE:
           case NOT_YET:
           case STOP:
-            recordsProcessed = 0;
-            recordsToProcess = 0;
-            probeState = ProbeState.DONE;
+            probeStatus.recordsProcessed = 0;
+            probeStatus.recordsToProcess = 0;
+            probeStatus.probeState = ProbeState.DONE;
 
             // We are done with the probe phase. If its a RIGHT or a FULL join 
get the unmatched indexes from the build side
             if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) 
{
-                probeState = ProbeState.PROJECT_RIGHT;
+                probeStatus.probeState = ProbeState.PROJECT_RIGHT;
             }
 
             continue;
@@ -138,78 +150,83 @@ public void executeProbePhase() throws 
SchemaChangeException {
               doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, 
outgoingJoinBatch);
               hashTable.updateBatches();
             } else {
-              throw new SchemaChangeException("Hash join does not support 
schema changes");
+              if (!unionTypeEnabled) {
+                throw new SchemaChangeException("Hash join does not support 
schema changes");
+              } else {
+                schemaChanged = true;
+                return;
+              }
             }
           case OK:
-            recordsToProcess = probeBatch.getRecordCount();
-            recordsProcessed = 0;
+            probeStatus.recordsToProcess = probeBatch.getRecordCount();
+            probeStatus.recordsProcessed = 0;
             // If we received an empty batch do nothing
-            if (recordsToProcess == 0) {
+            if (probeStatus.recordsToProcess == 0) {
               continue;
             }
+            if (coercedSchema != null) {
+              coercedContainer.zeroVectors();
+              final VectorContainer promotedContainer = 
SchemaUtil.coerceContainer(probeBatch, coercedSchema, oContext);
+              promotedContainer.transferOut(coercedContainer);
+            }
         }
       }
       int probeIndex = -1;
-
       // Check if we need to drain the next row in the probe side
-      if (getNextRecord) {
+      if (probeStatus.getNextRecord) {
         if (hashTable != null) {
-          probeIndex = hashTable.containsKey(recordsProcessed, true);
+          probeIndex = hashTable.containsKey(probeStatus.recordsProcessed, 
true);
         }
 
-          if (probeIndex != -1) {
-
-            /* The current probe record has a key that matches. Get the index
-             * of the first row in the build side that matches the current key
-             */
-            currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
-            /* Record in the build side at currentCompositeIdx has a matching 
record in the probe
-             * side. Set the bit corresponding to this index so if we are 
doing a FULL or RIGHT
-             * join we keep track of which records we need to project at the 
end
-             */
-            hjHelper.setRecordMatched(currentCompositeIdx);
-
-            projectBuildRecord(currentCompositeIdx, outputRecords);
-            projectProbeRecord(recordsProcessed, outputRecords);
-            outputRecords++;
+        if (probeIndex != -1) {
+          /* The current probe record has a key that matches. Get the index
+           * of the first row in the build side that matches the current key
+           */
+          probeStatus.currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+          /* Record in the build side at probeStatus.currentCompositeIdx has a 
matching record in the probe
+           * side. Set the bit corresponding to this index so if we are doing 
a FULL or RIGHT
+           * join we keep track of which records we need to project at the end
+           */
+          hjHelper.setRecordMatched(probeStatus.currentCompositeIdx);
+          projectBuildRecord(probeStatus.currentCompositeIdx, outputRecords);
+          projectProbeRecord(probeStatus.recordsProcessed, outputRecords);
+          outputRecords++;
             /* Projected single row from the build side with matching key but 
there
              * may be more rows with the same key. Check if that's the case
              */
-            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-            if (currentCompositeIdx == -1) {
+          probeStatus.currentCompositeIdx = 
hjHelper.getNextIndex(probeStatus.currentCompositeIdx);
+          if (probeStatus.currentCompositeIdx == -1) {
               /* We only had one row in the build side that matched the 
current key
                * from the probe side. Drain the next row in the probe side.
                */
-              recordsProcessed++;
-            } else {
+            probeStatus.recordsProcessed++;
+          } else {
               /* There is more than one row with the same key on the build side
                * don't drain more records from the probe side till we have 
projected
                * all the rows with this key
                */
-              getNextRecord = false;
-            }
+            probeStatus.getNextRecord = false;
+          }
         } else { // No matching key
-
-            // If we have a left outer join, project the keys
-            if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-              projectProbeRecord(recordsProcessed, outputRecords);
-              outputRecords++;
-            }
-            recordsProcessed++;
+          // If we have a left outer join, project the keys
+          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+            projectProbeRecord(probeStatus.recordsProcessed, outputRecords);
+            outputRecords++;
           }
+          probeStatus.recordsProcessed++;
+        }
       } else {
-        hjHelper.setRecordMatched(currentCompositeIdx);
-        projectBuildRecord(currentCompositeIdx, outputRecords);
-        projectProbeRecord(recordsProcessed, outputRecords);
+        hjHelper.setRecordMatched(probeStatus.currentCompositeIdx);
+        projectBuildRecord(probeStatus.currentCompositeIdx, outputRecords);
+        projectProbeRecord(probeStatus.recordsProcessed, outputRecords);
         outputRecords++;
 
-        currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
-        if (currentCompositeIdx == -1) {
+        probeStatus.currentCompositeIdx = 
hjHelper.getNextIndex(probeStatus.currentCompositeIdx);
+        //System.out.println("copying next idx " + 
probeStatus.currentCompositeIdx);
+        if (probeStatus.currentCompositeIdx == -1) {
           // We don't have any more rows matching the current key on the build 
side, move on to the next probe row
-          getNextRecord = true;
-          recordsProcessed++;
+          probeStatus.getNextRecord = true;
+          probeStatus.recordsProcessed++;
         }
       }
     }
@@ -219,18 +236,18 @@ public int probeAndProject() throws 
SchemaChangeException, ClassTransformationEx
 
     outputRecords = 0;
 
-    if (probeState == ProbeState.PROBE_PROJECT) {
+    if (probeStatus.probeState == ProbeState.PROBE_PROJECT) {
       executeProbePhase();
     }
 
-    if (probeState == ProbeState.PROJECT_RIGHT) {
+    if (probeStatus.probeState == ProbeState.PROJECT_RIGHT) {
 
       // We are here because we have a RIGHT OUTER or a FULL join
-      if (unmatchedBuildIndexes == null) {
+      if (probeStatus.unmatchedBuildIndexes == null) {
         // Initialize list of build indexes that didn't match a record on the 
probe side
-        unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
-        recordsToProcess = unmatchedBuildIndexes.size();
-        recordsProcessed = 0;
+        probeStatus.unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+        probeStatus.recordsToProcess = 
probeStatus.unmatchedBuildIndexes.size();
+        probeStatus.recordsProcessed = 0;
       }
 
       // Project the list of unmatched records on the build side
@@ -240,7 +257,8 @@ public int probeAndProject() throws SchemaChangeException, 
ClassTransformationEx
     return outputRecords;
   }
 
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") 
RecordBatch probeBatch,
+  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("buildBatch") VectorAccessible buildBatch,
+                               @Named("probeBatch") VectorAccessible 
probeBatch,
                                @Named("outgoing") RecordBatch outgoing);
   public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, 
@Named("outIndex") int outIndex);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 61640bc673..6f933734c4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -150,7 +150,6 @@ private static boolean 
allowImplicitCast(TypeProtos.MinorType input1, TypeProtos
         (input2 == TypeProtos.MinorType.VARCHAR || input2 == 
TypeProtos.MinorType.VARBINARY)) {
       return true;
     }
-
     return false;
   }
 
@@ -165,7 +164,7 @@ private static boolean 
allowImplicitCast(TypeProtos.MinorType input1, TypeProtos
    */
   public static void addLeastRestrictiveCasts(LogicalExpression[] 
leftExpressions, VectorAccessible leftBatch,
                                               LogicalExpression[] 
rightExpressions, VectorAccessible rightBatch,
-                                              FragmentContext context) {
+                                              FragmentContext context, boolean 
unionTypeEnabled) {
     assert rightExpressions.length == leftExpressions.length;
 
     for (int i = 0; i < rightExpressions.length; i++) {
@@ -177,8 +176,8 @@ public static void 
addLeastRestrictiveCasts(LogicalExpression[] leftExpressions,
       if (rightType == TypeProtos.MinorType.UNION || leftType == 
TypeProtos.MinorType.UNION) {
         continue;
       }
-      if (rightType != leftType) {
 
+      if (rightType != leftType) {
         // currently we only support implicit casts if the input types are 
numeric or varchar/varbinary
         if (!allowImplicitCast(rightType, leftType)) {
           throw new DrillRuntimeException(String.format("Join only supports 
implicit casts between " +
@@ -203,14 +202,14 @@ public static void 
addLeastRestrictiveCasts(LogicalExpression[] leftExpressions,
           // Store the newly casted expression
           rightExpressions[i] =
               ExpressionTreeMaterializer.materialize(castExpr, rightBatch, 
errorCollector,
-                  context.getFunctionRegistry());
+                  context.getFunctionRegistry(), false, unionTypeEnabled);
         } else if (result != leftType) {
           // Add a cast expression on top of the left expression
           LogicalExpression castExpr = 
ExpressionTreeMaterializer.addCastExpression(leftExpression, 
rightExpression.getMajorType(), context.getFunctionRegistry(), errorCollector);
           // store the newly casted expression
           leftExpressions[i] =
               ExpressionTreeMaterializer.materialize(castExpr, leftBatch, 
errorCollector,
-                  context.getFunctionRegistry());
+                  context.getFunctionRegistry(), false, unionTypeEnabled);
         }
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 9ef5cde109..c16d20a0c5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -314,7 +314,7 @@ private JoinWorker generateNewWorker() throws 
ClassTransformationException, IOEx
     // if right side is empty, rightExpr will most likely default to NULLABLE 
INT which may cause the following
     // call to throw an exception. In this case we can safely skip adding the 
casts
     if (lastRightStatus != IterOutcome.NONE) {
-      JoinUtils.addLeastRestrictiveCasts(leftExpr, leftIterator, rightExpr, 
rightIterator, context);
+      JoinUtils.addLeastRestrictiveCasts(leftExpr, leftIterator, rightExpr, 
rightIterator, context, false);
     }
     //generate doCompare() method
     /////////////////////////////////////////
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index af0a753047..2dc1154d9d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -223,6 +223,7 @@ public IterOutcome next() {
         Preconditions.checkArgument(rbdOld != null);
         Preconditions.checkArgument(rbdOld != rbdNew);
         container.transferOut(rbdOld.getContainer());
+        // Get vectors from new position.
         container.transferIn(rbdNew.getContainer());
         innerPosition = 0;
         outerPosition = nextOuterPosition;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 48f0a36233..7566346433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -117,10 +117,14 @@ private static  ValueVector coerceVector(ValueVector v, 
VectorContainer c, Mater
         ValueVector newVector = TypeHelper.getNewVector(field, 
context.getAllocator());
         Preconditions.checkState(field.getType().getMinorType() == 
MinorType.UNION, "Can only convert vector to Union vector");
         UnionVector u = (UnionVector) newVector;
-        u.addVector(tp.getTo());
-        MinorType type = v.getField().getType().getMinorType();
+        final ValueVector vv = u.addVector(tp.getTo());
+        final MinorType type = v.getField().getType().getMinorType();
         for (int i = 0; i < valueCount; i++) {
-          u.getMutator().setType(i, type);
+          if (!vv.getAccessor().isNull(i)) {
+            u.getMutator().setType(i, type);
+          } else {
+            u.getMutator().setType(i, MinorType.LATE);
+          }
         }
         for (MinorType t : field.getType().getSubTypeList()) {
           if (u.getField().getType().getSubTypeList().contains(t)) {
@@ -147,18 +151,23 @@ private static  ValueVector coerceVector(ValueVector v, 
VectorContainer c, Mater
    * @return
    */
   public static VectorContainer coerceContainer(VectorAccessible in, 
BatchSchema toSchema, OperatorContext context) {
-    int recordCount = in.getRecordCount();
+    int recordCount = 0;
+    SelectionVectorMode svMode= SelectionVectorMode.NONE;
     boolean isHyper = false;
     Map<SchemaPath, Object> vectorMap = Maps.newHashMap();
-    for (VectorWrapper w : in) {
-      if (w.isHyper()) {
-        isHyper = true;
-        final ValueVector[] vvs = w.getValueVectors();
-        vectorMap.put(vvs[0].getField().getPath(), vvs);
-      } else {
-        assert !isHyper;
-        final ValueVector v = w.getValueVector();
-        vectorMap.put(v.getField().getPath(), v);
+    if (in != null) {
+      recordCount = in.getRecordCount();
+      svMode = in.getSchema().getSelectionVectorMode();
+      for (VectorWrapper w : in) {
+        if (w.isHyper()) {
+          isHyper = true;
+          final ValueVector[] vvs = w.getValueVectors();
+          vectorMap.put(vvs[0].getField().getPath(), vvs);
+        } else {
+          assert !isHyper;
+          final ValueVector v = w.getValueVector();
+          vectorMap.put(v.getField().getPath(), v);
+        }
       }
     }
 
@@ -183,7 +192,7 @@ public static VectorContainer 
coerceContainer(VectorAccessible in, BatchSchema t
         c.add(coerceVector(v, c, field, recordCount, context));
       }
     }
-    c.buildSchema(in.getSchema().getSelectionVectorMode());
+    c.buildSchema(svMode);
     c.setRecordCount(recordCount);
     Preconditions.checkState(vectorMap.size() == 0, "Leftover vector from 
incoming batch");
     return c;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 1e8a52ff7d..3b5bb1f664 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -116,7 +116,7 @@ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath 
expectedPath) {
 
   public void transfer(VectorWrapper<?> destination) {
     Preconditions.checkArgument(destination instanceof SimpleVectorWrapper);
-    
Preconditions.checkArgument(getField().getType().equals(destination.getField().getType()));
+    
Preconditions.checkArgument(getField().getType().getMinorType().equals(destination.getField().getType().getMinorType()));
     
vector.makeTransferPair(((SimpleVectorWrapper)destination).vector).transfer();
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 33351ba9c8..60ef99ebb5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -104,7 +104,7 @@ void transferIn(VectorContainer containerIn) {
   /**
    * Transfer vectors from this to containerOut
    */
-  void transferOut(VectorContainer containerOut) {
+  public void transferOut(VectorContainer containerOut) {
     Preconditions.checkArgument(this.wrappers.size() == 
containerOut.wrappers.size());
     for (int i = 0; i < this.wrappers.size(); ++i) {
       this.wrappers.get(i).transfer(containerOut.wrappers.get(i));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 
b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 2077d6eafa..3e14e9afdb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -438,7 +438,7 @@ public static String getPhysicalFileFromResource(final 
String resource) throws I
    */
   public static String getTempDir(final String dirName) {
     final File dir = Files.createTempDir();
-    dir.deleteOnExit();
+    //dir.deleteOnExit();
 
     return dir.getAbsolutePath() + File.separator + dirName;
   }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java 
b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 6f78f8c7fd..c9d7c80543 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -429,7 +429,7 @@ private void checkNumBatches(final List<QueryDataBatch> 
results) {
     if (expectedNumBatches != EXPECTED_BATCH_COUNT_NOT_SET) {
       final int actualNumBatches = results.size();
       assertEquals(String.format("Expected %d batches but query returned %d 
non empty batch(es)%n", expectedNumBatches,
-          actualNumBatches), expectedNumBatches, actualNumBatches);
+        actualNumBatches), expectedNumBatches, actualNumBatches);
     }
   }
 
@@ -564,7 +564,9 @@ public boolean compareValues(Object expected, Object 
actual, int counter, String
    * @throws Exception
    */
   private void compareResults(List<Map> expectedRecords, List<Map> 
actualRecords) throws Exception {
-
+    for (Map<String, Object> actualRecord : actualRecords) {
+     // System.out.println(actualRecord);
+    }
     assertEquals("Different number of records returned", 
expectedRecords.size(), actualRecords.size());
 
     String missing = "";
@@ -594,14 +596,14 @@ private void compareResults(List<Map> expectedRecords, 
List<Map> actualRecords)
       if (!found) {
         StringBuilder sb = new StringBuilder();
         for (int expectedRecordDisplayCount = 0;
-             expectedRecordDisplayCount < 10 && expectedRecordDisplayCount < 
expectedRecords.size();
+             expectedRecordDisplayCount < 50 && expectedRecordDisplayCount < 
expectedRecords.size();
              expectedRecordDisplayCount++) {
           
sb.append(printRecord(expectedRecords.get(expectedRecordDisplayCount)));
         }
         String expectedRecordExamples = sb.toString();
         sb.setLength(0);
         for (int actualRecordDisplayCount = 0;
-             actualRecordDisplayCount < 10 && actualRecordDisplayCount < 
actualRecords.size();
+             actualRecordDisplayCount < 50 && actualRecordDisplayCount < 
actualRecords.size();
              actualRecordDisplayCount++) {
           sb.append(printRecord(actualRecords.get(actualRecordDisplayCount)));
         }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index a70a3f8d5b..526edabb1a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -20,11 +20,16 @@
 
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
 public class TestHashJoinAdvanced extends BaseTestQuery {
 
   // Have to disable merge join, if this testcase is to test "HASH-JOIN".
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinWithSchemaChanges.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinWithSchemaChanges.java
new file mode 100644
index 0000000000..2a0462260e
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinWithSchemaChanges.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+public class TestHashJoinWithSchemaChanges extends BaseTestQuery {
+
+  @Test
+  public void testHashInnerJoinWithSchemaChange() throws Exception {
+    String query = "select A.a as aa, A.b as ab, B.a as ba, B.b as bb from 
cp.`join/schemachange/left.json` A inner join cp.`join/schemachange/right.json` 
B on A.a=B.a";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("aa", "ab", "ba", "bb")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2").build().run();
+  }
+
+  @Test
+  public void testHashLeftJoinWithSchemaChange() throws Exception {
+    String query = "select A.a as aa, A.b as ab, B.a as ba, B.b as bb from 
cp.`join/schemachange/left.json` A left join cp.`join/schemachange/right.json` 
B on A.a=B.a";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("aa", "ab", "ba", "bb")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2")
+      .baselineValues(3l, 3l, null, null)
+      .baselineValues(3l, 3l, null, null)
+      .build().run();
+  }
+
+  @Test
+  public void testHashRightJoinWithSchemaChange() throws Exception {
+    String query = "select A.a as aa, A.b as ab, B.a as ba, B.b as bb from 
cp.`join/schemachange/left.json` A right join cp.`join/schemachange/right.json` 
B on A.a=B.a";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("aa", "ab", "ba", "bb")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(1l, 1l, 1l, 1l)
+      .baselineValues(1l, 1l, 1.0d, "1")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2")
+      .baselineValues(2l, 2l, 2l, 2l)
+      .baselineValues(2l, 2l, 2.0d, "2")
+      .baselineValues(null, null, 6.0d, 6l)
+      .build().run();
+  }
+
+  @Test
+  public void testHashJoinWithSchemaChangeMultiBatch() throws Exception {
+    final File leftSideDir = new 
File(BaseTestQuery.getTempDir("hashjoin-left"));
+    final File rightSideDir = new 
File(BaseTestQuery.getTempDir("hashjoin-right"));
+
+    leftSideDir.mkdirs();
+    rightSideDir.mkdirs();
+
+    System.out.println("left = " + leftSideDir);
+    System.out.println("right = " + rightSideDir);
+
+    // LEFT side
+    // int, int
+    BufferedWriter leftWriter = new BufferedWriter(new FileWriter(new 
File(leftSideDir, "1.json")));
+    for (int i = 1; i <= 10; ++i) {
+      leftWriter.write(String.format("{ \"lk\" : %d , \"lv\": %d }\n", i, i));
+    }
+    leftWriter.close();
+
+    // float float , new column
+    leftWriter = new BufferedWriter(new FileWriter(new File(leftSideDir, 
"2.json")));
+    for (int i = 5; i <= 15; ++i) {
+      leftWriter.write(String.format("{ \"lk\" : %f , \"lv\": %f , \"lc\": 
%d}\n", (float) i, (float) i, i));
+    }
+    leftWriter.close();
+
+    // string, string
+    leftWriter = new BufferedWriter(new FileWriter(new File(leftSideDir, 
"3.json")));
+    for (int i = 1; i <= 10; ++i) {
+      leftWriter.write(String.format("{ \"lk\" : \"%s\" , \"lv\": \"%s\"}\n", 
i, i));
+    }
+    leftWriter.close();
+
+    // RIGHT side
+    BufferedWriter rightWriter = new BufferedWriter(new FileWriter(new 
File(rightSideDir, "1.json")));
+    for (int i = 5; i <= 10; ++i) {
+      rightWriter.write(String.format("{ \"rk\" : %d , \"rv\": %d }\n", i, i));
+    }
+    rightWriter.close();
+    // float float new column
+    rightWriter = new BufferedWriter(new FileWriter(new File(rightSideDir, 
"2.json")));
+    for (int i = 5; i <= 10; ++i) {
+      rightWriter.write(String.format("{ \"rk\" : %f , \"rv\": %f , \"rc\": %d 
}\n", (float) i, (float) i, i));
+    }
+    rightWriter.close();
+    // string string
+    rightWriter = new BufferedWriter(new FileWriter(new File(rightSideDir, 
"3.json")));
+    for (int i = 5; i <= 15; ++i) {
+      rightWriter.write(String.format("{ \"rk\" : \"%s\", \"rv\": \"%s\" }\n", 
Integer.toString(i), Integer.toString(i)));
+    }
+    rightWriter.close();
+
+    // Test inner join. since we don't have full schema select all columns for 
now.
+    final String innerJoinQuery = String.format("select L.lk, L.lv, L.lc, 
R.rk, R.rv, R.rc from dfs_test.`%s` L inner join dfs_test.`%s` R on L.lk=R.rk",
+      leftSideDir.toPath().toString(), rightSideDir.toPath().toString());
+    final String leftJoinQuery = String.format("select L.lk, L.lv, L.lc, R.rk, 
R.rv, R.rc from dfs_test.`%s` L left join dfs_test.`%s` R on L.lk=R.rk",
+      leftSideDir.toPath().toString(), rightSideDir.toPath().toString());
+    final String rightJoinQuery = String.format("select L.lk, L.lv, L.lc, 
R.rk, R.rv, R.rc from dfs_test.`%s` L right join dfs_test.`%s` R on L.lk=R.rk",
+      leftSideDir.toPath().toString(), rightSideDir.toPath().toString());
+    final String outerJoinQuery = String.format("select L.lk, L.lv, L.lc, 
R.rk, R.rv, R.rc from dfs_test.`%s` L full outer join dfs_test.`%s` R on 
L.lk=R.rk",
+      leftSideDir.toPath().toString(), rightSideDir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(innerJoinQuery)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("lk", "lv", "lc", "rk", "rv", "rc");
+    for (long i = 5; i <= 10; i++) {
+      final String stringVal = Long.toString(i);
+      // match int with int and float
+      builder.baselineValues(i, i, null, i, i, null);
+      builder.baselineValues(i, i, null, (double)i, (double)i, i);
+      // match float with int and float
+      builder.baselineValues((double)i, (double)i, i, i, i, null);
+      builder.baselineValues((double)i, (double)i, i, (double)i, (double)i, i);
+      // match string with string
+      builder.baselineValues(stringVal, stringVal, null, stringVal, stringVal, 
null);
+    }
+    builder.build().run();
+
+    // LEFT
+    builder = testBuilder()
+      .sqlQuery(leftJoinQuery)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("lk", "lv", "lc", "rk", "rv", "rc");
+    for (long i = 5; i <= 10; i++) {
+      final String stringVal = Long.toString(i);
+      // match int with int and float
+      builder.baselineValues(i, i, null, i, i, null);
+      builder.baselineValues(i, i, null, (double)i, (double)i, i);
+      // match float with int and float
+      builder.baselineValues((double)i, (double)i, i, i, i, null);
+      builder.baselineValues((double)i, (double)i, i, (double)i, (double)i, i);
+      // match string with string
+      builder.baselineValues(stringVal, stringVal, null, stringVal, stringVal, 
null);
+    }
+    for (long i = 1; i < 5; i++) {
+      final String stringVal = Long.toString(i);
+      builder.baselineValues(i, i, null, null, null, null);
+      builder.baselineValues(stringVal, stringVal, null, null, null, null);
+    }
+    for (long i = 11; i <= 15; i++) {
+      builder.baselineValues((double)i, (double)i, i, null, null, null);
+    }
+    builder.build().run();
+
+    // RIGHT
+    builder = testBuilder()
+      .sqlQuery(rightJoinQuery)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("lk", "lv", "lc", "rk", "rv", "rc");
+    for (long i = 5; i <= 10; i++) {
+      final String stringVal = Long.toString(i);
+      // match int with int and float
+      builder.baselineValues(i, i, null, i, i, null);
+      builder.baselineValues(i, i, null, (double)i, (double)i, i);
+      // match float with int and float
+      builder.baselineValues((double)i, (double)i, i, i, i, null);
+      builder.baselineValues((double)i, (double)i, i, (double)i, (double)i, i);
+      // match string with string
+      builder.baselineValues(stringVal, stringVal, null, stringVal, stringVal, 
null);
+    }
+    for (long i = 11; i <= 15; i++) {
+      final String stringVal = Long.toString(i);
+      builder.baselineValues(null, null, null, stringVal, stringVal, null);
+    }
+    builder.build().run();
+
+    // OUTER
+    builder = testBuilder()
+      .sqlQuery(outerJoinQuery)
+      .unOrdered()
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .baselineColumns("lk", "lv", "lc", "rk", "rv", "rc");
+    for (long i = 5; i <= 10; i++) {
+      final String stringVal = Long.toString(i);
+      builder.baselineValues(i, i, null, i, i, null);
+      builder.baselineValues(i, i, null, (double)i, (double)i, i);
+      builder.baselineValues((double)i, (double)i, i, i, i, null);
+      builder.baselineValues((double)i, (double)i, i, (double)i, (double)i, i);
+      builder.baselineValues(stringVal, stringVal, null, stringVal, stringVal, 
null);
+    }
+    for (long i = 1; i < 5; i++) {
+      final String stringVal = Long.toString(i);
+      builder.baselineValues(i, i, null, null, null, null);
+      builder.baselineValues(stringVal, stringVal, null, null, null, null);
+    }
+    for (long i = 11; i <= 15; i++) {
+      final String stringVal = Long.toString(i);
+      builder.baselineValues((double)i, (double)i, i, null, null, null);
+      builder.baselineValues(null, null, null, stringVal, stringVal, null);
+    }
+    builder.build().run();
+  }
+
+  @Test
+  public void testOneSideSchemaChanges() throws Exception {
+    final File left_dir = new 
File(BaseTestQuery.getTempDir("hashjoin-schemachanges-left"));
+    final File right_dir = new 
File(BaseTestQuery.getTempDir("hashjoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+    System.out.println(left_dir);
+    System.out.println(right_dir);
+
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new 
File(left_dir, "l1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, 
(float) i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, 
"r1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+
+    String query = String.format("select * from dfs_test.`%s` L %s join 
dfs_test.`%s` R on L.kl=R.kl",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kl0", "vl0");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(i, i, i, i);
+    }
+    builder.go();
+  }
+
+  @Test
+  public void testMissingColumn() throws Exception {
+    final File left_dir = new 
File(BaseTestQuery.getTempDir("hashjoin-schemachanges-left"));
+    final File right_dir = new 
File(BaseTestQuery.getTempDir("hashjoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+    System.out.println(left_dir);
+    System.out.println(right_dir);
+
+    // missing column kl
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new 
File(left_dir, "l1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l3.json")));
+    for (int i = 100; i < 150; ++i) {
+      writer.write(String.format("{ \"kl2\" : %d , \"vl2\": %d }\n", i, i));
+    }
+    writer.close();
+
+    // right missing column kr
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, 
"r1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kr1\" : %f , \"vr1\": %f }\n", (float) 
i, (float) i));
+    }
+    //writer.write(String.format("{ \"kr1\" : null , \"vr1\": null , \"kr\" : 
500 }\n"));
+    //writer.write(String.format("{ \"kr1\" : null , \"vr1\": null , \"kr\" : 
500.0 }\n"));
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, 
"r2.json")));
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, 
(float)i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, 
"r3.json")));
+    for (int i = 100; i < 150; ++i) {
+      writer.write(String.format("{ \"kr2\" : %f , \"vr2\": %f }\n", (float)i, 
(float)i));
+    }
+    writer.close();
+
+    // INNER JOIN
+    String query = String.format("select L.kl, L.vl, R.kr, R.vr, L.kl1, L.vl1, 
L.kl2, L.vl2, R.kr1, R.vr1, R.kr2, R.vr2 from dfs_test.`%s` L %s join 
dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_mergejoin` = false; alter session set `exec.enable_union_type` 
= true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", 
"kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, 
null, null, null, null, null);
+    }
+    builder.go();
+    /*
+    // LEFT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` 
R on L.kl=R.kr",
+      left_dir.toPath().toString(), "left", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = 
true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", 
"kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(null, null, null, null, i, i, null, null, null, 
null, null, null);
+    }
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, 
null, null, null, null, null);
+    }
+    for (long i = 100; i < 150; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, i, i, null, 
null, null, null);
+    }
+    builder.go();
+
+    // RIGHT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` 
R on L.kl=R.kr",
+      left_dir.toPath().toString(), "right", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set 
`planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = 
true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", 
"kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, null, null, 
(double)i, (double)i, null, null);
+    }
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, 
null, null, null, null, null);
+    }
+    for (long i = 100; i < 150; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, null, null, 
null, null, (double)i, (double)i);
+    }
+    builder.go();
+    */
+  }
+}
diff --git a/exec/java-exec/src/test/resources/join/schemachange/left.json 
b/exec/java-exec/src/test/resources/join/schemachange/left.json
new file mode 100644
index 0000000000..75cc192184
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/schemachange/left.json
@@ -0,0 +1,6 @@
+{ "a": 1, "b": 1}
+{ "a": 1, "b": 1}
+{ "a": 2, "b": 2}
+{ "a": 2, "b": 2}
+{ "a": 3, "b": 3}
+{ "a": 3, "b": 3}
diff --git a/exec/java-exec/src/test/resources/join/schemachange/right.json 
b/exec/java-exec/src/test/resources/join/schemachange/right.json
new file mode 100644
index 0000000000..ac8c489f38
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/schemachange/right.json
@@ -0,0 +1,5 @@
+{ "a": 1,   "b": 1}
+{ "a": 1.0, "b": "1"}
+{ "a": 2,   "b": 2}
+{ "a": 2.0, "b": "2"}
+{ "a": 6.0, "b": 6}
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java 
b/exec/vector/src/main/codegen/templates/UnionVector.java
index 2e278b14c2..d44e4f162a 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -17,6 +17,8 @@
  */
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile 
name="/org/apache/drill/exec/vector/complex/UnionVector.java" />
@@ -232,7 +234,7 @@ public void copyFromSafe(int inIndex, int outIndex, 
UnionVector from) {
     copyFrom(inIndex, outIndex, from);
   }
 
-  public void addVector(ValueVector v) {
+  public ValueVector addVector(ValueVector v) {
     String name = v.getField().getType().getMinorType().name().toLowerCase();
     MajorType type = v.getField().getType();
     Preconditions.checkState(internalMap.getChild(name) == null, 
String.format("%s vector already exists", name));
@@ -240,6 +242,7 @@ public void addVector(ValueVector v) {
     v.makeTransferPair(newVector).transfer();
     internalMap.putChild(name, newVector);
     addSubType(v.getField().getType().getMinorType());
+    return newVector;
   }
 
   private class TransferImpl implements TransferPair {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support schema changes in hash join operator
> --------------------------------------------
>
>                 Key: DRILL-3991
>                 URL: https://issues.apache.org/jira/browse/DRILL-3991
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: amit hadke
>            Assignee: amit hadke
>            Priority: Major
>
> Hash join should be able to support schema changes during execution.
> It should resolve edge cases when join columns are missing.
> Example:
> |Table A | Table B|
> | k1&nbsp;&nbsp;&nbsp; v1 | k2 &nbsp;&nbsp;&nbsp; v2|
> | 1 &nbsp;&nbsp;&nbsp;  "a" | "2" &nbsp;&nbsp;&nbsp; "b"|
> | 2 &nbsp;&nbsp;&nbsp; "b" | 1  &nbsp;&nbsp;&nbsp;  "a"|
> | 2.0 &nbsp;&nbsp;&nbsp;"b" | 2.0  &nbsp;&nbsp;&nbsp;"b"|
> | 3 &nbsp;&nbsp;&nbsp;"c" | |
>    
> A INNER JOIN B on A.k1=B.k2
> |k1 |  v1  |     k2|    v2|
> | 1 | "a" | 1 | "a" | 
> | 2  | "b" | 2.0 | "b" |
> | 2.0 | "b" | 2.0 | "b" |
> Where in output
>     
>     k1 is of union type (INTEGER, DOUBLE)
>     k2 is of union type (INTEGER, DOUBLE, VARCHAR)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to