This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 89e0fe6b34259a2f51a7c45070935a2a2400eca4
Author: Ben-Zvi <bben-...@mapr.com>
AuthorDate: Fri Apr 27 21:59:25 2018 -0700

    DRILL-6027:
      - Added fallback option for HashJoin.
      - No copy of incoming for single partition, and avoid HT resize.
      - Fix memory leak when cancelling while spill file is read
      - get correct schema when probe side is empty
      - Re-create the HashJoinProbe
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  18 +-
 .../drill/exec/physical/base/AbstractBase.java     |   4 +-
 .../drill/exec/physical/base/PhysicalOperator.java |   4 +-
 .../drill/exec/physical/config/ExternalSort.java   |   4 +-
 .../drill/exec/physical/config/HashAggregate.java  |  11 +-
 .../drill/exec/physical/config/HashJoinPOP.java    |  66 +--
 .../drill/exec/physical/impl/BaseRootExec.java     |   2 -
 .../physical/impl/aggregate/HashAggTemplate.java   |   2 +-
 .../exec/physical/impl/common/HashPartition.java   | 137 ++++--
 .../physical/impl/common/HashTableTemplate.java    |  31 +-
 .../exec/physical/impl/join/HashJoinBatch.java     | 468 ++++++---------------
 .../impl/join/HashJoinHelperSizeCalculator.java    |   2 +-
 .../join/HashJoinHelperSizeCalculatorImpl.java     |   2 +-
 ...ava => HashJoinMechanicalMemoryCalculator.java} |   5 +-
 .../impl/join/HashJoinMemoryCalculator.java        |   2 +-
 .../impl/join/HashJoinMemoryCalculatorImpl.java    |   3 +-
 .../exec/physical/impl/join/HashJoinProbe.java     |  45 ++
 .../physical/impl/join/HashJoinProbeTemplate.java  | 425 +++++++++++++++++++
 .../exec/physical/impl/join/HashJoinState.java     |   2 +-
 .../impl/join/HashJoinStateCalculator.java         |   2 +-
 .../impl/join/HashTableSizeCalculator.java         |   2 +-
 .../HashTableSizeCalculatorConservativeImpl.java   |   2 +-
 .../impl/join/HashTableSizeCalculatorLeanImpl.java |   2 +-
 .../impl/protocol/OperatorRecordBatch.java         |   5 +
 .../apache/drill/exec/record/VectorContainer.java  | 101 +----
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../drill/exec/util/MemoryAllocationUtilities.java |   2 +-
 .../work/foreman/rm/ThrottledResourceManager.java  |   2 +-
 .../java-exec/src/main/resources/drill-module.conf |   4 +
 .../drill/exec/physical/impl/MockRecordBatch.java  |   3 +
 .../physical/impl/common/HashPartitionTest.java    |   8 +-
 .../impl/join/TestBuildSidePartitioningImpl.java   |   4 +-
 .../join/TestHashJoinHelperSizeCalculatorImpl.java |   2 +-
 .../join/TestHashJoinMemoryCalculatorImpl.java     |   2 +-
 .../exec/physical/impl/join/TestHashJoinSpill.java |  15 +-
 ...estHashTableSizeCalculatorConservativeImpl.java |   2 +-
 .../join/TestHashTableSizeCalculatorLeanImpl.java  |   2 +-
 .../exec/physical/impl/join/TestPartitionStat.java |   2 +-
 .../impl/join/TestPostBuildCalculationsImpl.java   |   2 +-
 .../physical/impl/unnest/MockLateralJoinBatch.java |   2 +
 .../xsort/managed/TestExternalSortInternals.java   |  14 +-
 .../physical/impl/xsort/managed/TestSorter.java    |   1 -
 42 files changed, 868 insertions(+), 547 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c48f414..4510511 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -104,14 +104,14 @@ public final class ExecConstants {
   public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = 
new BooleanValidator("exec.sort.disable_managed");
 
   // Hash Join Options
-  public static String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = 
"exec.hashjoin.hash_table_calc_type";
-  public static StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new 
StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
-  public static String HASHJOIN_SAFETY_FACTOR_KEY = 
"exec.hashjoin.safety_factor";
-  public static DoubleValidator HASHJOIN_SAFETY_FACTOR = new 
RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
-  public static String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = 
"exec.hashjoin.hash_double_factor";
-  public static DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new 
RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
-  public static String HASHJOIN_FRAGMENTATION_FACTOR_KEY = 
"exec.hashjoin.fragmentation_factor";
-  public static DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new 
RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+  public static final String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = 
"exec.hashjoin.hash_table_calc_type";
+  public static final StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new 
StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+  public static final String HASHJOIN_SAFETY_FACTOR_KEY = 
"exec.hashjoin.safety_factor";
+  public static final DoubleValidator HASHJOIN_SAFETY_FACTOR = new 
RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+  public static final String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = 
"exec.hashjoin.hash_double_factor";
+  public static final DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new 
RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+  public static final String HASHJOIN_FRAGMENTATION_FACTOR_KEY = 
"exec.hashjoin.fragmentation_factor";
+  public static final DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new 
RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
   public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = 
"exec.hashjoin.num_rows_in_batch";
   public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new 
RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
   public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = 
"exec.hashjoin.max_batches_in_memory";
@@ -122,6 +122,8 @@ public final class ExecConstants {
   public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new 
RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0L, Long.MAX_VALUE);
   public static final String HASHJOIN_SPILL_DIRS = 
"drill.exec.hashjoin.spill.directories";
   public static final String HASHJOIN_SPILL_FILESYSTEM = 
"drill.exec.hashjoin.spill.fs";
+  public static final String HASHJOIN_FALLBACK_ENABLED_KEY = 
"drill.exec.hashjoin.fallback.enabled";
+  public static final BooleanValidator HASHJOIN_FALLBACK_ENABLED_VALIDATOR = 
new BooleanValidator(HASHJOIN_FALLBACK_ENABLED_KEY);
 
   // Hash Aggregate Options
   public static final String HASHAGG_NUM_PARTITIONS_KEY = 
"exec.hashagg.num_partitions";
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index b93237e..8cf79d3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.google.common.base.Preconditions;
@@ -114,9 +115,10 @@ public abstract class AbstractBase implements 
PhysicalOperator {
   /**
    * Any operator that supports spilling should override this method (and 
return true)
    * @return false
+   * @param queryContext
    */
   @Override @JsonIgnore
-  public boolean isBufferedOperator() { return false; }
+  public boolean isBufferedOperator(QueryContext queryContext) { return false; 
}
 
   @Override
   public String getUserName() {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index d9fcd3a..35138c8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.graph.GraphValue;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@@ -91,9 +92,10 @@ public interface PhysicalOperator extends 
GraphValue<PhysicalOperator> {
   /**
    *
    * @return True iff this operator manages its memory (including disk 
spilling)
+   * @param queryContext
    */
   @JsonIgnore
-  boolean isBufferedOperator();
+  boolean isBufferedOperator(QueryContext queryContext);
 
   // public void setBufferedOperator(boolean bo);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index f0e88b4..9ead21c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.config;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
@@ -63,7 +64,8 @@ public class ExternalSort extends Sort {
   /**
    * The External Sort operator supports spilling
    * @return true
+   * @param queryContext
    */
   @Override
-  public boolean isBufferedOperator() { return true; }
+  public boolean isBufferedOperator(QueryContext queryContext) { return true; }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index f8e6d8e..51f34a0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.physical.config;
 
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -95,8 +97,13 @@ public class HashAggregate extends AbstractSingle {
   }
   /**
    * The Hash Aggregate operator supports spilling
-   * @return true
+   * @return true (unless a single partition is forced)
+   * @param queryContext
    */
   @Override
-  public boolean isBufferedOperator() { return true; }
+  public boolean isBufferedOperator(QueryContext queryContext) {
+    // In case forced to use a single partition - do not consider this a 
buffered op (when memory is divided)
+    return queryContext == null ||
+      1 < 
(int)queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR)
 ;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index b56950a..48d977e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -17,63 +17,77 @@
  */
 package org.apache.drill.exec.physical.config;
 
+import java.util.List;
+
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.calcite.rel.core.JoinRelType;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-import java.util.List;
 
 @JsonTypeName("hash-join")
 public class HashJoinPOP extends AbstractJoinPop {
-    static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
 
-    @JsonCreator
-    public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, 
@JsonProperty("right") PhysicalOperator right,
+  @JsonCreator
+  public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, 
@JsonProperty("right") PhysicalOperator right,
                        @JsonProperty("conditions") List<JoinCondition> 
conditions,
                        @JsonProperty("joinType") JoinRelType joinType) {
         super(left, right, joinType, null, conditions);
         Preconditions.checkArgument(joinType != null, "Join type is missing 
for HashJoin Pop");
-    }
+  }
 
-    @Override
-    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> 
children) {
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
         Preconditions.checkArgument(children.size() == 2);
         HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), 
children.get(1), conditions, joinType);
         newHashJoin.setMaxAllocation(getMaxAllocation());
         return newHashJoin;
-    }
+  }
 
-    public HashJoinPOP flipIfRight() {
-        if (joinType == JoinRelType.RIGHT) {
+  public HashJoinPOP flipIfRight() {
+      if (joinType == JoinRelType.RIGHT) {
             List<JoinCondition> flippedConditions = Lists.newArrayList();
             for (JoinCondition c : conditions) {
                 flippedConditions.add(c.flip());
             }
             return new HashJoinPOP(right, left, flippedConditions, 
JoinRelType.LEFT);
-        } else {
+      } else {
             return this;
-        }
-    }
+      }
+  }
 
-    @Override
-    public int getOperatorType() {
+  @Override
+  public int getOperatorType() {
         return CoreOperatorType.HASH_JOIN_VALUE;
     }
 
-    @Override
-    public void setMaxAllocation(long maxAllocation) {
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
+  public void setMaxAllocation(long maxAllocation) {
         this.maxAllocation = maxAllocation;
     }
 
-    @Override
-    public boolean isBufferedOperator() {
-        return true;
-    }
+  /**
+   * The Hash Aggregate operator supports spilling
+   * @return true (unless a single partition is forced)
+   * @param queryContext
+   */
+  @Override
+  public boolean isBufferedOperator(QueryContext queryContext) {
+    // In case forced to use a single partition - do not consider this a 
buffered op (when memory is divided)
+    return queryContext == null ||
+      1 < 
(int)queryContext.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR)
 ;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index b18a78e..e148278 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -32,8 +32,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
-import com.google.common.base.Supplier;
-
 public abstract class BaseRootExec implements RootExec {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseRootExec.class);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 4f6a117..258e8d0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -541,7 +541,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
     // multiply by the max number of rows in a batch to get the final 
estimated max size
     estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
-    // (When there are no aggr functions, use '1' as later code relies on this 
siisDebze being non-zero)
+    // (When there are no aggr functions, use '1' as later code relies on this 
size being non-zero)
     estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
     estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5d0197a..e525530 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.common;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
@@ -35,6 +34,8 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -68,7 +69,7 @@ import java.util.concurrent.TimeUnit;
 public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashPartition.class);
 
-  public static final String HASH_VALUE_COLUMN_NAME = "Hash_Values";
+  public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
 
   private int partitionNum = -1; // the current number of this partition, as 
used by the operator
 
@@ -107,30 +108,30 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
   private String spillFile;
 
   private BufferAllocator allocator;
-  private int RECORDS_PER_BATCH;
-  ChainedHashTable baseHashTable;
+  private int recordsPerBatch;
   private SpillSet spillSet;
   private boolean isSpilled; // is this partition spilled ?
   private boolean processingOuter; // is (inner done spilling and) now the 
outer is processed?
-  private boolean outerBatchNotNeeded; // when the inner is whole in memory
+  private boolean outerBatchAllocNotNeeded; // when the inner is whole in 
memory
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
   private int cycleNum;
+  private int numPartitions;
   private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = 
Lists.newArrayList();
   private long partitionInMemorySize;
   private long numInMemoryRecords;
 
   public HashPartition(FragmentContext context, BufferAllocator allocator, 
ChainedHashTable baseHashTable,
                        RecordBatch buildBatch, RecordBatch probeBatch,
-                       int recordsPerBatch, SpillSet spillSet, int partNum, 
int cycleNum) {
+                       int recordsPerBatch, SpillSet spillSet, int partNum, 
int cycleNum, int numPartitions) {
     this.allocator = allocator;
-    this.baseHashTable = baseHashTable;
     this.buildBatch = buildBatch;
     this.probeBatch = probeBatch;
-    this.RECORDS_PER_BATCH = recordsPerBatch;
+    this.recordsPerBatch = recordsPerBatch;
     this.spillSet = spillSet;
     this.partitionNum = partNum;
     this.cycleNum = cycleNum;
+    this.numPartitions = numPartitions;
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -147,7 +148,9 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
     }
     this.hjHelper = new HashJoinHelper(context, allocator);
     tmpBatchesList = new ArrayList<>();
-    allocateNewCurrentBatchAndHV();
+    if ( numPartitions > 1 ) {
+      allocateNewCurrentBatchAndHV();
+    }
   }
 
   /**
@@ -173,11 +176,11 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
         newVC.add(newVV); // add first to allow dealloc in case of an OOM
 
         if (newVV instanceof FixedWidthVector) {
-          ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
+          ((FixedWidthVector) newVV).allocateNew(recordsPerBatch);
         } else if (newVV instanceof VariableWidthVector) {
-          ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * 
RECORDS_PER_BATCH, RECORDS_PER_BATCH);
+          ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * 
recordsPerBatch, recordsPerBatch);
         } else if (newVV instanceof ObjectVector) {
-          ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
+          ((ObjectVector) newVV).allocateNew(recordsPerBatch);
         } else {
           newVV.allocateNew();
         }
@@ -197,10 +200,10 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
    *  Allocate a new current Vector Container and current HV vector
    */
   public void allocateNewCurrentBatchAndHV() {
-    if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in 
memory
+    if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole 
in memory
     currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : 
buildBatch);
     currHVVector = new 
IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
-    currHVVector.allocateNew(RECORDS_PER_BATCH);
+    currHVVector.allocateNew(recordsPerBatch);
   }
 
   /**
@@ -209,8 +212,8 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
   public void appendInnerRow(VectorContainer buildContainer, int ind, int 
hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
 
     int pos = currentBatch.appendRow(buildContainer,ind);
-    currHVVector.getMutator().set(pos, hashCode);   // store the hash value in 
the new column
-    if ( pos + 1 == RECORDS_PER_BATCH ) {
+    currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash 
value in the new column
+    if ( pos == recordsPerBatch ) {
       boolean needsSpill = isSpilled || calc.shouldSpill();
       completeAnInnerBatch(true, needsSpill);
     }
@@ -222,8 +225,8 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
    */
   public void appendOuterRow(int hashCode, int recordsProcessed) {
     int pos = 
currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
-    currHVVector.getMutator().set(pos, hashCode);   // store the hash value in 
the new column
-    if ( pos + 1 == RECORDS_PER_BATCH ) {
+    currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash 
value in the new column
+    if ( pos == recordsPerBatch ) {
       completeAnOuterBatch(true);
     }
   }
@@ -265,6 +268,42 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
     }
   }
 
+  /**
+   *  Append the incoming batch (actually only the vectors of that batch) into 
the tmp list
+   */
+  public void appendBatch(VectorAccessible batch) {
+    assert numPartitions == 1;
+    int recordCount = batch.getRecordCount();
+    currHVVector = new 
IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
+    currHVVector.allocateNew(recordCount /* recordsPerBatch */);
+    try {
+      // For every record in the build batch, hash the key columns and keep 
the result
+      for (int ind = 0; ind < recordCount; ind++) {
+        int hashCode = getBuildHashCode(ind);
+        currHVVector.getMutator().set(ind, hashCode);   // store the hash 
value in the new HV column
+      }
+    } catch(SchemaChangeException sce) {}
+
+    VectorContainer container = new VectorContainer();
+    List<ValueVector> vectors = Lists.newArrayList();
+
+    for (VectorWrapper<?> v : batch) {
+      TransferPair tp = v.getValueVector().getTransferPair(allocator);
+      tp.transfer();
+      vectors.add(tp.getTo());
+    }
+
+    container.addCollection(vectors);
+    container.add(currHVVector); // the HV vector is added as an extra "column"
+    container.setRecordCount(recordCount);
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    tmpBatchesList.add(container);
+    partitionBatchesCount++;
+    currHVVector = null;
+    numInMemoryRecords += recordCount;
+  }
+
   public void spillThisPartition() {
     if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing 
to spill
     logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size 
{} batches", partitionNum, cycleNum, tmpBatchesList.size());
@@ -300,15 +339,15 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
         v.getValueVector().getMutator().setValueCount(numRecords);
       }
 
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, 
false);
+      WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, 
false);
       try {
-        writer.write(batch, null);
+        writer.write(wBatch, null);
       } catch (IOException ioe) {
         throw UserException.dataWriteError(ioe)
           .message("Hash Join failed to write to output file: " + spillFile)
           .build(logger);
       } finally {
-        batch.clear();
+        wBatch.clear();
       }
       vc.zeroVectors();
       logger.trace("HASH JOIN: Took {} us to spill {} records", 
writer.time(TimeUnit.MICROSECONDS), numRecords);
@@ -398,20 +437,9 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
     return partitionNum;
   }
 
-  private void freeCurrentBatchAndHVVector() {
-    if ( currentBatch != null ) {
-      currentBatch.clear();
-      currentBatch = null;
-    }
-    if ( currHVVector != null ) {
-      currHVVector.clear();
-      currHVVector = null;
-    }
-  }
-
-  public void closeWriterAndDeleteFile() {
-    closeWriterInternal(true);
-  }
+  /**
+   * Close the writer without deleting the spill file
+   */
   public void closeWriter() { // no deletion !!
     closeWriterInternal(false);
     processingOuter = true; // After the spill file was closed
@@ -442,7 +470,8 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
   }
 
   /**
-   * Creates the hash table and join helper for this partition. This method 
should only be called after all the build side records
+   * Creates the hash table and join helper for this partition.
+   * This method should only be called after all the build side records
    * have been consumed.
    */
   public void buildContainersHashTableAndHelper() throws SchemaChangeException 
{
@@ -464,7 +493,6 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
 
       hashTable.updateIncoming(nextBatch, probeBatch );
 
-      // IntVector HV_vector = (IntVector) 
nextBatch.getValueVector(rightHVColPosition).getValueVector();
       IntVector HV_vector = (IntVector) nextBatch.getLast();
 
       for (int recInd = 0; recInd < currentRecordCount; recInd++) {
@@ -473,7 +501,7 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
           hashTable.put(recInd, htIndex, hashCode);
         } catch (RetryAfterSpillException RE) {
           throw new OutOfMemoryException("HT put");
-        } // Hash Join can not retry yet
+        } // Hash Join does not retry
         /* Use the global index returned by the hash table, to store
          * the current record index and batch index. This will be used
          * later when we probe and find a match.
@@ -483,7 +511,7 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
 
       containers.add(nextBatch);
     }
-    outerBatchNotNeeded = true; // the inner is whole in memory, no need for 
an outer batch
+    outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need 
for an outer batch
   }
 
   public void getStats(HashTableStats newStats) {
@@ -493,7 +521,7 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
   /**
    * Frees memory allocated to the {@link HashTable} and {@link 
HashJoinHelper}.
    */
-  public void clearHashTableAndHelper() {
+  private void clearHashTableAndHelper() {
     if (hashTable != null) {
       hashTable.clear();
       hashTable = null;
@@ -504,7 +532,22 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
     }
   }
 
-  public void close() {
+  private void freeCurrentBatchAndHVVector() {
+    if ( currentBatch != null ) {
+      currentBatch.clear();
+      currentBatch = null;
+    }
+    if ( currHVVector != null ) {
+      currHVVector.clear();
+      currHVVector = null;
+    }
+  }
+
+  /**
+   * Free all in-memory allocated structures.
+   * @param deleteFile - whether to delete the spill file or not
+   */
+  public void cleanup(boolean deleteFile) {
     freeCurrentBatchAndHVVector();
     if (containers != null && !containers.isEmpty()) {
       for (VectorContainer vc : containers) {
@@ -515,11 +558,15 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
       VectorContainer vc = tmpBatchesList.remove(0);
       vc.clear();
     }
-    closeWriter();
-    partitionBatchesCount = 0;
-    spillFile = null;
+    closeWriterInternal(deleteFile);
     clearHashTableAndHelper();
-    if ( containers != null ) { containers.clear(); }
+    if ( containers != null ) {
+      containers.clear();
+    }
+  }
+
+  public void close() {
+    cleanup(true);
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 06e3bcd..bb0b1ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -65,6 +65,8 @@ public abstract class HashTableTemplate implements HashTable {
   // Array of batch holders..each batch holder can hold up to BATCH_SIZE 
entries
   private ArrayList<BatchHolder> batchHolders;
 
+  private int totalBatchHoldersSize; // the size of all batchHolders
+
   // Current size of the hash table in terms of number of buckets
   private int tableSize = 0;
 
@@ -483,6 +485,7 @@ public abstract class HashTableTemplate implements 
HashTable {
 
     // Create the first batch holder
     batchHolders = new ArrayList<BatchHolder>();
+    totalBatchHoldersSize = 0;
     // First BatchHolder is created when the first put request is received.
 
     try {
@@ -498,6 +501,7 @@ public abstract class HashTableTemplate implements 
HashTable {
   public void updateInitialCapacity(int initialCapacity) {
     htConfig = htConfig.withInitialCapacity(initialCapacity);
     allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+    enlargeEmptyHashTableIfNeeded(initialCapacity);
   }
 
   @Override
@@ -543,6 +547,7 @@ public abstract class HashTableTemplate implements 
HashTable {
       }
       batchHolders.clear();
       batchHolders = null;
+      totalBatchHoldersSize = 0;
     }
     startIndices.clear();
     // currentIdxHolder = null; // keep IndexPointer in case HT is reused
@@ -568,6 +573,7 @@ public abstract class HashTableTemplate implements 
HashTable {
     if ( batchAdded ) {
       logger.trace("OOM - Removing index {} from the batch holders 
list",batchHolders.size() - 1);
       BatchHolder bh = batchHolders.remove(batchHolders.size() - 1);
+      totalBatchHoldersSize -= BATCH_SIZE;
       bh.clear();
     }
     freeIndex--;
@@ -677,7 +683,7 @@ public abstract class HashTableTemplate implements 
HashTable {
     }
     htIdxHolder.value = currentIdx;
     return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
-        ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
+        ( freeIndex + 1 > totalBatchHoldersSize /* batchHolders.size() * 
BATCH_SIZE */ ) ?
         PutStatus.KEY_ADDED_LAST : // the last key in the batch
         PutStatus.KEY_ADDED;     // otherwise
   }
@@ -710,9 +716,9 @@ public abstract class HashTableTemplate implements 
HashTable {
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if 
the currentIdx exceeds
   // the capacity, we will add a new BatchHolder. Return true if a new batch 
was added.
   private boolean addBatchIfNeeded(int currentIdx) throws 
SchemaChangeException {
-    int totalBatchSize = batchHolders.size() * BATCH_SIZE;
+    // int totalBatchSize = batchHolders.size() * BATCH_SIZE;
 
-    if (currentIdx >= totalBatchSize) {
+    if (currentIdx >= totalBatchHoldersSize) {
       BatchHolder bh = newBatchHolder(batchHolders.size(), 
allocationTracker.getNextBatchHolderSize());
       batchHolders.add(bh);
       bh.setup();
@@ -721,6 +727,8 @@ public abstract class HashTableTemplate implements 
HashTable {
       }
 
       allocationTracker.commit();
+
+      totalBatchHoldersSize += BATCH_SIZE; // total increased by 1 batch
       return true;
     }
     return false;
@@ -797,6 +805,22 @@ public abstract class HashTableTemplate implements 
HashTable {
   }
 
   /**
+   *  Resize up the Hash Table if needed (to hold newNum entries)
+   */
+  public void enlargeEmptyHashTableIfNeeded(int newNum) {
+    assert numEntries == 0;
+    if ( newNum < threshold )  { return; } // no need to resize
+
+    while ( tableSize * 2 < MAXIMUM_CAPACITY && newNum > threshold ) {
+      tableSize *= 2;
+      threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
+    }
+    startIndices.clear();
+    startIndices = allocMetadataVector(tableSize, EMPTY_SLOT);
+  }
+
+
+  /**
    * Reinit the hash table to its original size, and clear up all its prior 
batch holder
    *
    */
@@ -806,6 +830,7 @@ public abstract class HashTableTemplate implements 
HashTable {
     freeIndex = 0; // all batch holders are gone
     // reallocate batch holders, and the hash table to the original size
     batchHolders = new ArrayList<BatchHolder>();
+    totalBatchHoldersSize = 0;
     startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
   }
   public void updateIncoming(VectorContainer newIncoming, RecordBatch 
newIncomingProbe) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0c46e36..ee7a8a3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -37,8 +37,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -66,7 +68,24 @@ import 
org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
 /**
+ *   This class implements the runtime execution for the Hash-Join operator
+ *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
  *
+ *   This implementation splits the incoming Build side rows into multiple 
Partitions, thus allowing spilling of
+ *   some of these partitions to disk if memory gets tight. Each partition is 
implemented as a {@link HashPartition}.
+ *   After the build phase is over, in the most general case, some of the 
partitions were spilled, and the others
+ *   are in memory. Each of the partitions in memory would get a {@link 
HashTable} built.
+ *      Next the Probe side is read, and each row is key matched with a Build 
partition. If that partition is in
+ *   memory, then the key is used to probe and perform the join, and the 
results are added to the outgoing batch.
+ *   But if that build side partition was spilled, then the matching Probe 
size partition is spilled as well.
+ *      After all the Probe side was processed, we are left with pairs of 
spilled partitions. Then each pair is
+ *   processed individually (that Build partition should be smaller than the 
original, hence likely fit whole into
+ *   memory to allow probing; if not -- see below).
+ *      Processing of each spilled pair is EXACTLY like processing the 
original Build/Probe incomings. (As a fact,
+ *   the {@Link #innerNext() innerNext} method calls itself recursively !!). 
Thus the spilled build partition is
+ *   read and divided into new partitions, which in turn may spill again (and 
again...).
+ *   The code tracks these spilling "cycles". Normally any such "again" (i.e. 
cycle of 2 or greater) is a waste,
+ *   indicating that the number of partitions chosen was too small.
  */
 public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   protected static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
@@ -86,6 +105,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   // Join conditions
   private final List<JoinCondition> conditions;
+
+  // Runtime generated class implementing HashJoinProbe interface
+  private HashJoinProbe hashJoinProbe = null;
+
   private final List<NamedExpression> rightExpr;
 
   /**
@@ -120,6 +143,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   // Schema of the build side
   private BatchSchema rightSchema;
+  // Schema of the probe side
+  private BatchSchema probeSchema;
+
 
   private int rightHVColPosition;
   private BufferAllocator allocator;
@@ -131,15 +157,15 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
   private SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private int cycleNum = 0; // primary, secondary, tertiary, etc.
+  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
-  IntVector read_HV_vector; // HV vector that was read from the spilled batch
+  IntVector read_right_HV_vector; // HV vector that was read from the spilled 
batch
   private int maxBatchesInMemory;
 
   /**
    * This holds information about the spilled partitions for the build and 
probe side.
    */
-  private static class HJSpilledPartition {
+  public static class HJSpilledPartition {
     public int innerSpilledBatches;
     public String innerSpillFile;
     public int outerSpilledBatches;
@@ -189,6 +215,12 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       setupHashTable();
     }
     setupOutputContainerSchema();
+    try {
+      hashJoinProbe = setupHashJoinProbe();
+    } catch (IOException | ClassTransformationException e) {
+      throw new SchemaChangeException(e);
+    }
+
     // Build the container schema and set the counts
     for (final VectorWrapper<?> w : container) {
       w.getValueVector().allocateNew();
@@ -212,7 +244,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       return false;
     }
 
-    if (checkForEarlyFinish()) {
+    if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
       state = BatchState.DONE;
       return false;
     }
@@ -234,11 +266,16 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
       switch (outcome) {
         case OK_NEW_SCHEMA:
-          // We need to have the schema of the build side even when the build 
side is empty
-          rightSchema = buildBatch.getSchema();
-          // position of the new "column" for keeping the hash values (after 
the real columns)
-          rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
-          // new schema can also contain records
+          if ( inputIndex == 0 ) {
+            // Indicate that a schema was seen (in case probe side is empty)
+            probeSchema = probeBatch.getSchema();
+          } else {
+            // We need to have the schema of the build side even when the 
build side is empty
+            rightSchema = buildBatch.getSchema();
+            // position of the new "column" for keeping the hash values (after 
the real columns)
+            rightHVColPosition = 
buildBatch.getContainer().getNumberOfColumns();
+            // new schema can also contain records
+          }
         case OK:
           if (recordBatch.getRecordCount() == 0) {
             continue;
@@ -265,12 +302,19 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
       return new HashJoinMemoryCalculatorImpl(safetyFactor, 
fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
     } else {
-      return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory);
+      return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
     }
   }
 
   @Override
   public IterOutcome innerNext() {
+    // In case incoming was killed before, just cleanup and return
+    if ( wasKilled ) {
+      this.cleanup();
+      super.close();
+      return IterOutcome.NONE;
+    }
+
     try {
       /* If we are here for the first time, execute the build phase of the
        * hash join and setup the run time generated class for the probe side
@@ -280,19 +324,18 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         executeBuildPhase();
         // Update the hash table related stats for the operator
         updateStats();
-        //
-        setupProbe();
+        // Initialize various settings for the probe side
+        hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, 
leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, 
numPartitions, rightHVColPosition);
       }
 
-      // Store the number of records projected
-
+      // Try to probe and project, or recursively handle a spilled partition
       if ( ! buildSideIsEmpty ||  // If there are build-side rows
            joinType != JoinRelType.INNER) {  // or if this is a left/full 
outer join
 
         // Allocate the memory for the vectors in the output container
         allocateVectors();
 
-        outputRecords = probeAndProject();
+        outputRecords = hashJoinProbe.probeAndProject();
 
         /* We are here because of one the following
          * 1. Completed processing of all the records and we are done
@@ -314,13 +357,13 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         // Free all partitions' in-memory data structures
         // (In case need to start processing spilled partitions)
         for ( HashPartition partn : partitions ) {
-          partn.close();
+          partn.cleanup(false); // clean, but do not delete the spill files !!
         }
 
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if ( !buildSideIsEmpty && !wasKilled && 
!spilledPartitionsList.isEmpty()) {
+        if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) {
           // Get the next (previously) spilled partition to handle as incoming
           HJSpilledPartition currSp = spilledPartitionsList.remove(0);
 
@@ -337,7 +380,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
           } else {
             probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
             leftUpstream = IterOutcome.NONE;
-            changeToFinalProbeState();
+            hashJoinProbe.changeToFinalProbeState();
           }
 
           // update the cycle num if needed
@@ -356,12 +399,15 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
               this.cleanup();
               throw UserException
                 .unsupportedError()
-                .message("Hash-Join can not partition inner data any further 
(too many join-key duplicates? - try merge-join)\n"
+                .message("Hash-Join can not partition the inner data any 
further (probably due to too many join-key duplicates)\n"
                 + "On cycle num %d mem available %d num partitions %d", 
cycleNum, allocator.getLimit(), numPartitions)
                 .build(logger);
             }
           }
-          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {}-{} batches). More {} spilled partitions left.", 
currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, 
currSp.outerSpilledBatches, currSp.innerSpilledBatches, 
spilledPartitionsList.size());
+          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {}-{} batches)." +
+              " More {} spilled partitions left.",
+            currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, 
currSp.outerSpilledBatches,
+            currSp.innerSpilledBatches, spilledPartitionsList.size());
 
           state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
@@ -400,12 +446,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   private void setupHashTable() throws SchemaChangeException {
     final List<Comparator> comparators = 
Lists.newArrayListWithExpectedSize(conditions.size());
-    // When DRILL supports Java 8, use the following instead of the for() loop
-    // 
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-    for (int i=0; i<conditions.size(); i++) {
-      JoinCondition cond = conditions.get(i);
-      comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
-    }
+    
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -441,16 +482,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     //
     //  Find out the estimated max batch size, etc
     //  and compute the max numPartitions possible
+    //  See partitionNumTuning()
     //
-    // numPartitions = 8; // just for initial work; change later
-    // partitionMask = 7;
-    // bitsInMask = 3;
-
-    //  SET FROM CONFIGURATION OPTIONS :
-    //  ================================
 
-    // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
-    // Based on the number of partitions: Set the mask and bit count
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
 
@@ -471,7 +505,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
       partitions[part] = new HashPartition(context, allocator, baseHashTable, 
buildBatch, probeBatch,
-        RECORDS_PER_BATCH, spillSet, part, cycleNum);
+        RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
     }
 
     spilledInners = new HJSpilledPartition[numPartitions];
@@ -526,15 +560,38 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         TARGET_RECORDS_PER_BATCH,
         HashTable.DEFAULT_LOAD_FACTOR);
 
-      numPartitions = 1; // We are only using one partition
-      canSpill = false; // We cannot spill
-      allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework 
and force unbounded memory
+      disableSpilling(null);
     }
 
     return buildCalc;
   }
 
   /**
+   *  Disable spilling - use only a single partition and set the memory limit 
to the max ( 10GB )
+   *  @param reason If not null - log this as warning, else check fallback 
setting to either warn or fail.
+   */
+  private void disableSpilling(String reason) {
+    // Fail, or just issue a warning if a reason was given, or a fallback 
option is enabled
+    if ( reason == null ) {
+      final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+      if (fallbackEnabled) {
+        logger.warn("Spilling is disabled - not enough memory available for 
internal partitioning. Falling back" +
+          " to use unbounded memory");
+      } else {
+        throw UserException.resourceError().message(String.format("Not enough 
memory for internal partitioning and fallback mechanism for " +
+          "HashJoin to use unbounded memory is disabled. Either enable 
fallback config %s using Alter " +
+          "session/system command or increase memory limit for Drillbit", 
ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
+      }
+    } else {
+      logger.warn(reason);
+    }
+
+    numPartitions = 1; // We are only using one partition
+    canSpill = false; // We cannot spill
+    allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and 
force unbounded memory
+  }
+
+  /**
    *  Execute the BUILD phase; first read incoming and split rows into 
partitions;
    *  may decide to spill some of the partitions
    *
@@ -560,7 +617,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      // We've sniffed first non empty build and probe batches so we have 
enough information to createa calculator
+      // We've sniffed first non empty build and probe batches so we have 
enough information to create a calculator
       buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash 
values bug fixed
         buildBatch,
         probeBatch,
@@ -608,25 +665,30 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         for (HashPartition partn : partitions) { partn.updateBatches(); }
         // Fall through
       case OK:
+        // Special treatment (when no spill, and single partition) -- use the 
incoming vectors as they are (no row copy)
+        if ( numPartitions == 1 ) {
+          partitions[0].appendBatch(buildBatch);
+          break;
+        }
         final int currentRecordCount = buildBatch.getRecordCount();
 
         if ( cycleNum > 0 ) {
-          read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+          read_right_HV_vector = (IntVector) 
buildBatch.getContainer().getLast();
         }
 
         // For every record in the build batch, hash the key columns and keep 
the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
           int hashCode = ( cycleNum == 0 ) ? 
partitions[0].getBuildHashCode(ind)
-            : read_HV_vector.getAccessor().get(ind); // get the hash value 
from the HV column
+            : read_right_HV_vector.getAccessor().get(ind); // get the hash 
value from the HV column
           int currPart = hashCode & partitionMask ;
           hashCode >>>= bitsInMask;
           // Append the new inner row to the appropriate partition; spill 
(that partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, 
hashCode, buildCalc); // may spill if needed
         }
 
-        if ( read_HV_vector != null ) {
-          read_HV_vector.clear();
-          read_HV_vector = null;
+        if ( read_right_HV_vector != null ) {
+          read_right_HV_vector.clear();
+          read_right_HV_vector = null;
         }
         break;
       }
@@ -637,8 +699,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     // Move the remaining current batches into their temp lists, or spill
     // them if the partition is spilled. Add the spilled partitions into
     // the spilled partitions list
-    for (HashPartition partn : partitions) {
-      partn.completeAnInnerBatch(false, partn.isSpilled() );
+    if ( numPartitions > 1 ) { // a single partition needs no completion
+      for (HashPartition partn : partitions) {
+        partn.completeAnInnerBatch(false, partn.isSpilled());
+      }
     }
 
     HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = 
buildCalc.next();
@@ -715,7 +779,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       }
     }
 
-    if (leftUpstream == IterOutcome.OK || leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
+    if (probeSchema != null) { // a probe schema was seen (even though the 
probe may had no rows)
       for (final VectorWrapper<?> vv : probeBatch) {
         final MajorType inputType = vv.getField().getType();
         final MajorType outputType;
@@ -761,6 +825,15 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     return spilledInners[part] != null;
   }
 
+  /**
+   *  The constructor
+   *
+   * @param popConfig
+   * @param context
+   * @param left  -- probe/outer side incoming input
+   * @param right -- build/iner side incoming input
+   * @throws OutOfMemoryException
+   */
   public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
       RecordBatch left, /*Probe side record batch*/
       RecordBatch right /*Build side record batch*/
@@ -783,16 +856,15 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new 
FieldReference(refName)));
     }
 
+    this.allocator = oContext.getAllocator();
+
     numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
     if ( numPartitions == 1 ) { //
-      canSpill = false;
-      logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
+      disableSpilling("Spilling is disabled due to configuration setting of 
num_partitions to 1");
     }
 
     numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case 
not a power of 2
 
-    this.allocator = oContext.getAllocator();
-
     final long memLimit = 
context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
 
     if (memLimit != 0) {
@@ -802,6 +874,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     RECORDS_PER_BATCH = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
     maxBatchesInMemory = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
 
+    logger.info("Memory limit {} bytes", 
FileUtils.byteCountToDisplaySize(allocator.getLimit()));
     spillSet = new SpillSet(context, popConfig);
 
     // Create empty partitions (in the ctor - covers the case where right side 
is empty)
@@ -818,10 +891,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
         (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
-    // clean (and deallocate) each partition
+    // clean (and deallocate) each partition, and delete its spill file
     for (HashPartition partn : partitions) {
-      partn.clearHashTableAndHelper();
-      partn.closeWriterAndDeleteFile();
+      partn.close();
     }
 
     // delete any spill file left in unread spilled partitions
@@ -896,288 +968,24 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   @Override
   public void close() {
-    for ( HashPartition partn : partitions ) {
-      partn.close();
+    if ( cycleNum > 0 ) { // spilling happened
+      // In case closing due to cancellation, BaseRootExec.close() does not 
close the open
+      // SpilledRecordBatch "scanners" as it only knows about the original 
left/right ops.
+      killIncoming(false);
     }
-    cleanup();
+    this.cleanup();
     super.close();
   }
 
-  // ==============================================================
-  //
-  //    Methods used for the probe
-  //
-  // ============================================================
-  private BatchSchema probeSchema;
-
-  enum ProbeState {
-    PROBE_PROJECT, PROJECT_RIGHT, DONE
-  }
-
-  private int currRightPartition = 0; // for returning RIGHT/FULL
-
-  // Number of records to process on the probe side
-  private int recordsToProcess = 0;
-
-  // Number of records processed on the probe side
-  private int recordsProcessed = 0;
-
-  // Indicate if we should drain the next record from the probe side
-  private boolean getNextRecord = true;
-
-  // Contains both batch idx and record idx of the matching record in the 
build side
-  private int currentCompositeIdx = -1;
-
-  // Current state the hash join algorithm is in
-  private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
-  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
-  private List<Integer> unmatchedBuildIndexes = null;
+  public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, IOException {
+    final CodeGenerator<HashJoinProbe> cg = 
CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
+    cg.plainJavaCapable(true);
+    // cg.saveCodeForDebugging(true);
 
-  // While probing duplicates, retain current build-side partition in case 
need to continue
-  // probing later on the same chain of duplicates
-  private HashPartition currPartition;
+    //  No real code generation !!
 
-  /**
-   * Various initialization needed to perform the probe
-   * Must be called AFTER the build completes
-   */
-  private void setupProbe() {
-    currRightPartition = 0; // In case it's a Right/Full outer join
-    recordsProcessed = 0;
-    recordsToProcess = 0;
-
-    probeSchema = probeBatch.getSchema();
-    probeState = ProbeState.PROBE_PROJECT;
-
-    // A special case - if the left was an empty file
-    if ( leftUpstream == IterOutcome.NONE ){
-      changeToFinalProbeState();
-    } else {
-      this.recordsToProcess = probeBatch.getRecordCount();
-    }
-
-    // for those outer partitions that need spilling (cause their matching 
inners spilled)
-    // initialize those partitions' current batches and hash-value vectors
-    for ( HashPartition partn : partitions ) {
-      partn.allocateNewCurrentBatchAndHV();
-    }
-
-    if ( cycleNum > 0 ) {
-      if ( read_HV_vector != null ) { read_HV_vector.clear();}
-      if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was 
empty
-        read_HV_vector = (IntVector) probeBatch.getContainer().getLast();
-      }
-    }
+    final HashJoinProbe hj = context.getImplementationClass(cg);
+    return hj;
   }
 
-  private void executeProjectRightPhase(int currBuildPart) {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
-      outputRecords =
-        container.appendRow(partitions[currBuildPart].getContainers(), 
unmatchedBuildIndexes.get(recordsProcessed),
-          null /* no probeBatch */, 0 /* no probe index */ );
-      recordsProcessed++;
-    }
-  }
-
-  private void executeProbePhase() throws SchemaChangeException {
-
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
-      // Check if we have processed all records in this batch we need to 
invoke next
-      if (recordsProcessed == recordsToProcess) {
-
-        // Done processing all records in the previous batch, clean up!
-        for (VectorWrapper<?> wrapper : probeBatch) {
-          wrapper.getValueVector().clear();
-        }
-
-        IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
-        switch (leftUpstream) {
-          case NONE:
-          case NOT_YET:
-          case STOP:
-            recordsProcessed = 0;
-            recordsToProcess = 0;
-            changeToFinalProbeState();
-            // in case some outer partitions were spilled, need to spill their 
last batches
-            for ( HashPartition partn : partitions ) {
-              if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
-              partn.completeAnOuterBatch(false);
-              // update the partition's spill record with the outer side
-              HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
-              sp.outerSpillFile = partn.getSpillFile();
-              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
-
-              partn.closeWriter();
-            }
-
-            continue;
-
-          case OK_NEW_SCHEMA:
-            if (probeBatch.getSchema().equals(probeSchema)) {
-              for ( HashPartition partn : partitions ) { 
partn.updateBatches(); }
-
-            } else {
-              throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in probe side.",
-                probeSchema,
-                probeBatch.getSchema());
-            }
-          case OK:
-            recordsToProcess = probeBatch.getRecordCount();
-            recordsProcessed = 0;
-            // If we received an empty batch do nothing
-            if (recordsToProcess == 0) {
-              continue;
-            }
-            if ( cycleNum > 0 ) {
-              read_HV_vector = (IntVector) 
probeBatch.getContainer().getLast(); // Needed ?
-            }
-        }
-      }
-      int probeIndex = -1;
-      // Check if we need to drain the next row in the probe side
-      if (getNextRecord) {
-
-        if ( !buildSideIsEmpty ) {
-          int hashCode = ( cycleNum == 0 ) ?
-            partitions[0].getProbeHashCode(recordsProcessed)
-            : read_HV_vector.getAccessor().get(recordsProcessed);
-          int currBuildPart = hashCode & partitionMask ;
-          hashCode >>>= bitsInMask;
-
-          // Set and keep the current partition (may be used again on 
subsequent probe calls as
-          // inner rows of duplicate key are processed)
-          currPartition = partitions[currBuildPart]; // inner if not spilled, 
else outer
-
-          // If the matching inner partition was spilled
-          if ( isSpilledInner(currBuildPart) ) {
-            // add this row to its outer partition (may cause a spill, when 
the batch is full)
-
-            currPartition.appendOuterRow(hashCode, recordsProcessed);
-
-            recordsProcessed++; // done with this outer record
-            continue; // on to the next outer record
-          }
-
-          probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
-
-        }
-
-        if (probeIndex != -1) {
-
-          /* The current probe record has a key that matches. Get the index
-           * of the first row in the build side that matches the current key
-           * (and record this match in the bitmap, in case of a FULL/RIGHT 
join)
-           */
-          currentCompositeIdx = currPartition.getStartIndex(probeIndex);
-
-          outputRecords =
-            container.appendRow(currPartition.getContainers(), 
currentCompositeIdx,
-              probeBatch.getContainer(), recordsProcessed);
-
-          /* Projected single row from the build side with matching key but 
there
-           * may be more rows with the same key. Check if that's the case
-           */
-          currentCompositeIdx = 
currPartition.getNextIndex(currentCompositeIdx);
-          if (currentCompositeIdx == -1) {
-            /* We only had one row in the build side that matched the current 
key
-             * from the probe side. Drain the next row in the probe side.
-             */
-            recordsProcessed++;
-          } else {
-            /* There is more than one row with the same key on the build side
-             * don't drain more records from the probe side till we have 
projected
-             * all the rows with this key
-             */
-            getNextRecord = false;
-          }
-        } else { // No matching key
-
-          // If we have a left outer join, project the outer side
-          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
-            outputRecords =
-              container.appendOuterRow(probeBatch.getContainer(), 
recordsProcessed, rightHVColPosition);
-          }
-          recordsProcessed++;
-        }
-      }
-      else { // match the next inner row with the same key
-
-        currPartition.setRecordMatched(currentCompositeIdx);
-
-        outputRecords =
-          container.appendRow(currPartition.getContainers(), 
currentCompositeIdx,
-            probeBatch.getContainer(), recordsProcessed);
-
-        currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
-
-        if (currentCompositeIdx == -1) {
-          // We don't have any more rows matching the current key on the build 
side, move on to the next probe row
-          getNextRecord = true;
-          recordsProcessed++;
-        }
-      }
-    }
-  }
-
-  /**
-   *  Perform the probe and project the results
-   *
-   * @return number of output records
-   * @throws SchemaChangeException
-   */
-  private int probeAndProject() throws SchemaChangeException {
-
-    outputRecords = 0;
-
-    // When handling spilled partitions, the state becomes DONE at the end of 
each partition
-    if ( probeState == ProbeState.DONE ) {
-      return outputRecords; // that is zero
-    }
-
-    if (probeState == ProbeState.PROBE_PROJECT) {
-      executeProbePhase();
-    }
-
-    if (probeState == ProbeState.PROJECT_RIGHT) {
-      // Inner probe is done; now we are here because we still have a RIGHT 
OUTER (or a FULL) join
-
-      do {
-
-        if (unmatchedBuildIndexes == null) { // first time for this partition ?
-          if ( buildSideIsEmpty ) { return outputRecords; } // in case of an 
empty right
-          // Get this partition's list of build indexes that didn't match any 
record on the probe side
-          unmatchedBuildIndexes = 
partitions[currRightPartition].getNextUnmatchedIndex();
-          recordsProcessed = 0;
-          recordsToProcess = unmatchedBuildIndexes.size();
-        }
-
-        // Project the list of unmatched records on the build side
-        executeProjectRightPhase(currRightPartition);
-
-        if ( recordsProcessed < recordsToProcess ) { // more records in this 
partition?
-          return outputRecords;  // outgoing is full; report and come back 
later
-        } else {
-          currRightPartition++; // on to the next right partition
-          unmatchedBuildIndexes = null;
-        }
-
-      }   while ( currRightPartition < numPartitions );
-
-      probeState = ProbeState.DONE; // last right partition was handled; we 
are done now
-    }
-
-    return outputRecords;
-  }
-
-  private void changeToFinalProbeState() {
-    // We are done with the (left) probe phase.
-    // If it's a RIGHT or a FULL join then need to get the unmatched indexes 
from the build side
-    probeState =
-      (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? 
ProbeState.PROJECT_RIGHT :
-      ProbeState.DONE; // else we're done
-  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
index f5c826a..6ddd05e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
index 728fde5..a17ea2f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
similarity index 97%
rename from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
rename to 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index 8b367dd..618e80e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.drill.exec.physical.impl.join;
 
 import com.google.common.base.Preconditions;
@@ -24,12 +23,12 @@ import org.apache.drill.exec.record.RecordBatch;
 import javax.annotation.Nullable;
 import java.util.Set;
 
-public class MechanicalHashJoinMemoryCalculator implements 
HashJoinMemoryCalculator {
+public class HashJoinMechanicalMemoryCalculator implements 
HashJoinMemoryCalculator {
   private final int maxNumInMemBatches;
 
   private boolean doMemoryCalc;
 
-  public MechanicalHashJoinMemoryCalculator(int maxNumInMemBatches) {
+  public HashJoinMechanicalMemoryCalculator(int maxNumInMemBatches) {
     this.maxNumInMemBatches = maxNumInMemBatches;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index f2de0fe..71292a5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 5890a42..ed0adc5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.join;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
new file mode 100644
index 0000000..f212605
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.record.VectorContainer;
+
+public interface HashJoinProbe {
+  TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, 
HashJoinProbeTemplate.class);
+
+  /* The probe side of the hash join can be in the following two states
+   * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we 
have a
+   *    key match and if we do we project the record
+   * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting 
the records
+   *    from the build side that did not match any records on the probe side. 
For Left outer
+   *    case we handle it internally by projecting the record if there isn't a 
match on the build side
+   * 3. DONE: Once we have projected all possible records we are done
+   */
+  enum ProbeState {
+    PROBE_PROJECT, PROJECT_RIGHT, DONE
+  }
+
+  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition);
+  int  probeAndProject() throws SchemaChangeException;
+  void changeToFinalProbeState();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
new file mode 100644
index 0000000..75c3073
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+
+  VectorContainer container; // the outgoing container
+
+  // Probe side record batch
+  private RecordBatch probeBatch;
+
+  private BatchSchema probeSchema;
+
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private JoinRelType joinType;
+
+  private HashJoinBatch outgoingJoinBatch = null;
+
+  private static final int TARGET_RECORDS_PER_BATCH = 4000;
+
+  // Number of records to process on the probe side
+  private int recordsToProcess = 0;
+
+  // Number of records processed on the probe side
+  private int recordsProcessed = 0;
+
+  // Number of records in the output container
+  private int outputRecords;
+
+  // Indicate if we should drain the next record from the probe side
+  private boolean getNextRecord = true;
+
+  // Contains both batch idx and record idx of the matching record in the 
build side
+  private int currentCompositeIdx = -1;
+
+  // Current state the hash join algorithm is in
+  private ProbeState probeState = ProbeState.PROBE_PROJECT;
+
+  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
+  private List<Integer> unmatchedBuildIndexes = null;
+
+  private  HashPartition partitions[];
+
+  // While probing duplicates, retain current build-side partition in case 
need to continue
+  // probing later on the same chain of duplicates
+  private HashPartition currPartition;
+
+  private int currRightPartition = 0; // for returning RIGHT/FULL
+  IntVector read_left_HV_vector; // HV vector that was read from the spilled 
batch
+  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
+  private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer 
to find the partition
+  private boolean buildSideIsEmpty = true;
+  private int numPartitions = 1; // must be 2 to the power of bitsInMask
+  private int partitionMask = 0; // numPartitions - 1
+  private int bitsInMask = 0; // number of bits in the MASK
+  private int rightHVColPosition;
+
+  /**
+   *  Setup the Hash Join Probe object
+   *
+   * @param probeBatch
+   * @param outgoing
+   * @param joinRelType
+   * @param leftStartState
+   * @param partitions
+   * @param cycleNum
+   * @param container
+   * @param spilledInners
+   * @param buildSideIsEmpty
+   * @param numPartitions
+   * @param rightHVColPosition
+   */
+  @Override
+  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch 
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] 
partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition) {
+    this.container = container;
+    this.spilledInners = spilledInners;
+    this.probeBatch = probeBatch;
+    this.probeSchema = probeBatch.getSchema();
+    this.joinType = joinRelType;
+    this.outgoingJoinBatch = outgoing;
+    this.partitions = partitions;
+    this.cycleNum = cycleNum;
+    this.buildSideIsEmpty = buildSideIsEmpty;
+    this.numPartitions = numPartitions;
+    this.rightHVColPosition = rightHVColPosition;
+
+    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    probeState = ProbeState.PROBE_PROJECT;
+    this.recordsToProcess = 0;
+    this.recordsProcessed = 0;
+
+    // A special case - if the left was an empty file
+    if ( leftStartState == IterOutcome.NONE ){
+      changeToFinalProbeState();
+    } else {
+      this.recordsToProcess = probeBatch.getRecordCount();
+    }
+
+    // for those outer partitions that need spilling (cause their matching 
inners spilled)
+    // initialize those partitions' current batches and hash-value vectors
+    for ( HashPartition partn : this.partitions) {
+      partn.allocateNewCurrentBatchAndHV();
+    }
+
+    currRightPartition = 0; // In case it's a Right/Full outer join
+
+    // Initialize the HV vector for the first (already read) left batch
+    if ( this.cycleNum > 0 ) {
+      if ( read_left_HV_vector != null ) { read_left_HV_vector.clear();}
+      if ( leftStartState != IterOutcome.NONE ) { // Skip when outer spill was 
empty
+        read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
+      }
+    }
+  }
+
+  /**
+   * Append the given build side row into the outgoing container
+   * @param buildSrcContainer The container for the right/inner side
+   * @param buildSrcIndex build side index
+   * @return The index for the last column (where the probe side would 
continue copying)
+   */
+  private int appendBuild(VectorContainer buildSrcContainer, int 
buildSrcIndex) {
+    // "- 1" to skip the last "hash values" added column
+    int lastColIndex = buildSrcContainer.getNumberOfColumns() - 1 ;
+    for (int vectorIndex = 0; vectorIndex < lastColIndex; vectorIndex++) {
+      ValueVector destVector = 
container.getValueVector(vectorIndex).getValueVector();
+      ValueVector srcVector = 
buildSrcContainer.getValueVector(vectorIndex).getValueVector();
+      destVector.copyEntry(container.getRecordCount(), srcVector, 
buildSrcIndex);
+    }
+    return lastColIndex;
+  }
+  /**
+   * Append the given probe side row into the outgoing container, following 
the build side part
+   * @param probeSrcContainer The container for the left/outer side
+   * @param probeSrcIndex probe side index
+   * @param baseIndex The column index to start copying into (following the 
build columns)
+   */
+  private void appendProbe(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseIndex) {
+    for (int vectorIndex = baseIndex; vectorIndex < 
container.getNumberOfColumns(); vectorIndex++) {
+      ValueVector destVector = 
container.getValueVector(vectorIndex).getValueVector();
+      ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - 
baseIndex).getValueVector();
+      destVector.copyEntry(container.getRecordCount(), srcVector, 
probeSrcIndex);
+    }
+  }
+  /**
+   *  A special version of the VectorContainer's appendRow for the HashJoin; 
(following a probe) it
+   *  copies the build and probe sides into the outgoing container. (It uses a 
composite
+   *  index for the build side)
+   * @param buildSrcContainers The containers list for the right/inner side
+   * @param compositeBuildSrcIndex Composite build index
+   * @param probeSrcContainer The single container for the left/outer side
+   * @param probeSrcIndex Index in the outer container
+   * @return Number of rows in this container (after the append)
+   */
+  private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int 
compositeBuildSrcIndex,
+                        VectorContainer probeSrcContainer, int probeSrcIndex) {
+    int buildBatchIndex = compositeBuildSrcIndex >>> 16;
+    int buildOffset = compositeBuildSrcIndex & 65535;
+    int baseInd = 0;
+    if ( buildSrcContainers != null ) { baseInd = 
appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset); }
+    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, 
probeSrcIndex, baseInd); }
+    return container.incRecordCount();
+  }
+
+  /**
+   * A customised version of the VectorContainer's appendRow for HashJoin - 
used for Left
+   * Outer Join when there is no build side match - hence need a base index in
+   * this container's wrappers from where to start appending
+   * @param probeSrcContainer
+   * @param probeSrcIndex
+   * @param baseInd - index of this container's wrapper to start at
+   * @return
+   */
+  private int outputOuterRow(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseInd) {
+    appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
+    return container.incRecordCount();
+  }
+
+
+  private void executeProjectRightPhase(int currBuildPart) {
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
+      outputRecords =
+        outputRow(partitions[currBuildPart].getContainers(), 
unmatchedBuildIndexes.get(recordsProcessed),
+          null /* no probeBatch */, 0 /* no probe index */ );
+      recordsProcessed++;
+    }
+  }
+
+  private void executeProbePhase() throws SchemaChangeException {
+
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+
+      // Check if we have processed all records in this batch we need to 
invoke next
+      if (recordsProcessed == recordsToProcess) {
+
+        // Done processing all records in the previous batch, clean up!
+        for (VectorWrapper<?> wrapper : probeBatch) {
+          wrapper.getValueVector().clear();
+        }
+
+        IterOutcome leftUpstream = 
outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
+
+        switch (leftUpstream) {
+          case NONE:
+          case NOT_YET:
+          case STOP:
+            recordsProcessed = 0;
+            recordsToProcess = 0;
+            changeToFinalProbeState();
+            // in case some outer partitions were spilled, need to spill their 
last batches
+            for ( HashPartition partn : partitions ) {
+              if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+              partn.completeAnOuterBatch(false);
+              // update the partition's spill record with the outer side
+              HashJoinBatch.HJSpilledPartition sp = 
spilledInners[partn.getPartitionNum()];
+              sp.outerSpillFile = partn.getSpillFile();
+              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+
+              partn.closeWriter();
+            }
+
+            continue;
+
+          case OK_NEW_SCHEMA:
+            if (probeBatch.getSchema().equals(probeSchema)) {
+              for ( HashPartition partn : partitions ) { 
partn.updateBatches(); }
+
+            } else {
+              throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in probe side.",
+                probeSchema,
+                probeBatch.getSchema());
+            }
+          case OK:
+            recordsToProcess = probeBatch.getRecordCount();
+            recordsProcessed = 0;
+            // If we received an empty batch do nothing
+            if (recordsToProcess == 0) {
+              continue;
+            }
+            if ( cycleNum > 0 ) {
+              read_left_HV_vector = (IntVector) 
probeBatch.getContainer().getLast(); // Needed ?
+            }
+        }
+      }
+
+        int probeIndex = -1;
+      // Check if we need to drain the next row in the probe side
+      if (getNextRecord) {
+
+        if ( !buildSideIsEmpty ) {
+          int hashCode = ( cycleNum == 0 ) ?
+            partitions[0].getProbeHashCode(recordsProcessed)
+            : read_left_HV_vector.getAccessor().get(recordsProcessed);
+          int currBuildPart = hashCode & partitionMask ;
+          hashCode >>>= bitsInMask;
+
+          // Set and keep the current partition (may be used again on 
subsequent probe calls as
+          // inner rows of duplicate key are processed)
+          currPartition = partitions[currBuildPart]; // inner if not spilled, 
else outer
+
+          // If the matching inner partition was spilled
+          if ( outgoingJoinBatch.isSpilledInner(currBuildPart) ) {
+            // add this row to its outer partition (may cause a spill, when 
the batch is full)
+
+            currPartition.appendOuterRow(hashCode, recordsProcessed);
+
+            recordsProcessed++; // done with this outer record
+            continue; // on to the next outer record
+          }
+
+          probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
+
+        }
+
+        if (probeIndex != -1) {
+
+          /* The current probe record has a key that matches. Get the index
+           * of the first row in the build side that matches the current key
+           * (and record this match in the bitmap, in case of a FULL/RIGHT 
join)
+           */
+          currentCompositeIdx = currPartition.getStartIndex(probeIndex);
+
+          outputRecords =
+            outputRow(currPartition.getContainers(), currentCompositeIdx,
+              probeBatch.getContainer(), recordsProcessed);
+
+          /* Projected single row from the build side with matching key but 
there
+           * may be more rows with the same key. Check if that's the case
+           */
+          currentCompositeIdx = 
currPartition.getNextIndex(currentCompositeIdx);
+          if (currentCompositeIdx == -1) {
+            /* We only had one row in the build side that matched the current 
key
+             * from the probe side. Drain the next row in the probe side.
+             */
+            recordsProcessed++;
+          } else {
+            /* There is more than one row with the same key on the build side
+             * don't drain more records from the probe side till we have 
projected
+             * all the rows with this key
+             */
+            getNextRecord = false;
+          }
+        } else { // No matching key
+
+          // If we have a left outer join, project the outer side
+          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+
+            outputRecords =
+              outputOuterRow(probeBatch.getContainer(), recordsProcessed, 
rightHVColPosition);
+          }
+          recordsProcessed++;
+        }
+      }
+      else { // match the next inner row with the same key
+
+        currPartition.setRecordMatched(currentCompositeIdx);
+
+        outputRecords =
+          outputRow(currPartition.getContainers(), currentCompositeIdx,
+            probeBatch.getContainer(), recordsProcessed);
+
+        currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+
+        if (currentCompositeIdx == -1) {
+          // We don't have any more rows matching the current key on the build 
side, move on to the next probe row
+          getNextRecord = true;
+          recordsProcessed++;
+        }
+      }
+    }
+
+  }
+
+  /**
+   *  Perform the probe, till the outgoing is full, or no more rows to probe.
+   *  Performs the inner or left-outer join while there are left rows,
+   *  when done, continue with right-outer, if appropriate.
+   * @return Num of output records
+   * @throws SchemaChangeException
+   */
+  @Override
+  public int probeAndProject() throws SchemaChangeException {
+
+    outputRecords = 0;
+
+    // When handling spilled partitions, the state becomes DONE at the end of 
each partition
+    if ( probeState == ProbeState.DONE ) {
+      return outputRecords; // that is zero
+    }
+
+    if (probeState == ProbeState.PROBE_PROJECT) {
+      executeProbePhase();
+    }
+
+    if (probeState == ProbeState.PROJECT_RIGHT) {
+      // Inner probe is done; now we are here because we still have a RIGHT 
OUTER (or a FULL) join
+
+      do {
+
+        if (unmatchedBuildIndexes == null) { // first time for this partition ?
+          if ( buildSideIsEmpty ) { return outputRecords; } // in case of an 
empty right
+          // Get this partition's list of build indexes that didn't match any 
record on the probe side
+          unmatchedBuildIndexes = 
partitions[currRightPartition].getNextUnmatchedIndex();
+          recordsProcessed = 0;
+          recordsToProcess = unmatchedBuildIndexes.size();
+        }
+
+        // Project the list of unmatched records on the build side
+        executeProjectRightPhase(currRightPartition);
+
+        if ( recordsProcessed < recordsToProcess ) { // more records in this 
partition?
+          return outputRecords;  // outgoing is full; report and come back 
later
+        } else {
+          currRightPartition++; // on to the next right partition
+          unmatchedBuildIndexes = null;
+        }
+
+      }   while ( currRightPartition < numPartitions );
+
+      probeState = ProbeState.DONE; // last right partition was handled; we 
are done now
+    }
+
+    return outputRecords;
+  }
+
+  @Override
+  public void changeToFinalProbeState() {
+    // We are done with the (left) probe phase.
+    // If it's a RIGHT or a FULL join then need to get the unmatched indexes 
from the build side
+    probeState =
+      (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? 
ProbeState.PROJECT_RIGHT :
+        ProbeState.DONE; // else we're done
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
index 33f22be..7fe9b5f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
index 695a68c..4036438 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
index 04d15ff..0bc0a7d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
index 05354d5..8575021 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
index 87d8d1b..4f9e585 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 4f0cff8..620f150 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -153,4 +153,9 @@ public class OperatorRecordBatch implements 
CloseableRecordBatch {
   public void close() {
     driver.close();
   }
+
+  @Override
+  public VectorContainer getContainer() {
+    return batchAccessor.getOutgoingContainer();
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index d966f50..e35bb5f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.record;
 
 import java.lang.reflect.Array;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -218,7 +217,7 @@ public class VectorContainer implements VectorAccessible {
     * Appends a row taken from a source {@link VectorContainer} to this {@link 
VectorContainer}.
     * @param srcContainer The {@link VectorContainer} to copy a row from.
     * @param srcIndex The index of the row to copy from the source {@link 
VectorContainer}.
-    * @return Position where the row was appended
+    * @return Position one above where the row was appended
     */
     public int appendRow(VectorContainer srcContainer, int srcIndex) {
       for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
@@ -226,95 +225,8 @@ public class VectorContainer implements VectorAccessible {
         ValueVector srcVector = 
srcContainer.wrappers.get(vectorIndex).getValueVector();
         destVector.copyEntry(recordCount, srcVector, srcIndex);
       }
-      int pos = recordCount++;
-      initialized = true;
-      return pos;
-    }
-
-  /**
-   *  This method currently is only used by the Hash Join to return a row 
composed of build+probe rows
-   *
-   * This works with non-hyper {@link VectorContainer}s which have no 
selection vectors.
-   * Appends a row taken from two source {@link VectorContainer}s to this 
{@link VectorContainer}.
-   * @param buildSrcContainer The {@link VectorContainer} to copy the first 
columns of a row from.
-   * @param buildSrcIndex The index of the row to copy from the build side 
source {@link VectorContainer}.
-   * @param probeSrcContainer The {@link VectorContainer} to copy the last 
columns of a row from.
-   * @param probeSrcIndex The index of the row to copy from the probe side 
source {@link VectorContainer}.
-   * @return Number of records in the container after appending
-   */
-  public int appendRowXXX(VectorContainer buildSrcContainer, int 
buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
-    if ( buildSrcContainer != null ) {
-      for (int vectorIndex = 0; vectorIndex < 
buildSrcContainer.wrappers.size(); vectorIndex++) {
-        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
-        ValueVector srcVector = 
buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
-        destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
-      }
-    }
-    if ( probeSrcContainer != null ) {
-      int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
-      for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); 
vectorIndex++) {
-        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
-        ValueVector srcVector = 
probeSrcContainer.wrappers.get(vectorIndex).getValueVector();
-        destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
-      }
+      return incRecordCount();
     }
-    recordCount++;
-    initialized = true;
-    return recordCount;
-  }
-
-  private int appendBuild(VectorContainer buildSrcContainer, int 
buildSrcIndex) {
-    // "- 1" to skip the last "hash values" added column
-    int lastIndex = buildSrcContainer.wrappers.size() - 1 ;
-    for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) {
-      ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
-      ValueVector srcVector = 
buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
-      destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
-    }
-    return lastIndex;
-  }
-  private void appendProbe(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseIndex) {
-      // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
-      for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); 
vectorIndex++) {
-        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
-        ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - 
baseIndex).getValueVector();
-        destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
-      }
-  }
-  /**
-   *  A special version of appendRow for the HashJoin; uses a composite index 
for the build side
-   * @param buildSrcContainers The containers list for the right side
-   * @param compositeBuildSrcIndex Composite build index
-   * @param probeSrcContainer The single container for the left/outer side
-   * @param probeSrcIndex Index in the outer container
-   * @return Number of rows in this container (after the append)
-   */
-  public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int 
compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
-    int buildBatch = compositeBuildSrcIndex >>> 16;
-    int buildOffset = compositeBuildSrcIndex & 65535;
-    int baseInd = 0;
-    if ( buildSrcContainers != null ) { baseInd = 
appendBuild(buildSrcContainers.get(buildBatch), buildOffset); }
-    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, 
probeSrcIndex, baseInd); }
-    recordCount++;
-    initialized = true;
-    return recordCount;
-  }
-
-  /**
-   * A customised version of the special appendRow for HashJoin - used for Left
-   * Outer Join when there is no build side match - hence need a base index in
-   * this container's wrappers from where to start appending
-   * @param probeSrcContainer
-   * @param probeSrcIndex
-   * @param baseInd - index of this container's wrapper to start at
-   * @return
-   */
-  public int appendOuterRow(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseInd) {
-    appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
-    recordCount++;
-    initialized = true;
-    return recordCount;
-  }
 
   public TypedFieldId add(ValueVector vv) {
     schemaChanged = true;
@@ -459,6 +371,15 @@ public class VectorContainer implements VectorAccessible {
       initialized = true;
   }
 
+  /**
+   * Increment the record count
+   * @return the new record count
+   */
+  public int incRecordCount() {
+    initialized = true;
+    return ++recordCount;
+  }
+
   @Override
   public int getRecordCount() {
     Preconditions.checkState(hasRecordCount(), "Record count not set for this 
vector container");
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 183c1f1..663def1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -121,6 +121,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
       new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), 
// for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
       new 
OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // 
for tuning
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 3189535..20ef79d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -68,7 +68,7 @@ public class MemoryAllocationUtilities {
     // look for external sorts
     final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
     for (final PhysicalOperator op : plan.getSortedOperators()) {
-      if (op.isBufferedOperator()) {
+      if (op.isBufferedOperator(queryContext)) {
         bufferedOpList.add(op);
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
index bbbd88c..9c2a2d7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -229,7 +229,7 @@ public class ThrottledResourceManager extends 
AbstractResourceManager {
       @Override
       public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
           throws RuntimeException {
-        if (op.isBufferedOperator()) {
+        if (op.isBufferedOperator(null)) {
           value.add(op);
         }
         visitChildren(op, value);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index b4969c0..7d92570 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -421,6 +421,10 @@ drill.exec.options: {
     drill.exec.functions.cast_empty_string_to_null: false,
     drill.exec.rpc.fragrunner.timeout: 10000,
     drill.exec.hashjoin.mem_limit: 0,
+    # Setting to control if HashJoin should fallback to older behavior of 
consuming
+    # unbounded memory. By default it's set to false such that the
+    # query will fail if there is not enough memory
+    drill.exec.hashjoin.fallback.enabled: true, # should soon be changed to 
false !!
     # Setting to control if HashAgg should fallback to older behavior of 
consuming
     # unbounded memory. In case of 2 phase Agg when available memory is not 
enough
     # to start at least 2 partitions then HashAgg fallbacks to this case. It 
can be
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 5463974..0c43ab2 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -182,6 +182,9 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
     return container.iterator();
   }
 
+  @Override
+  public VectorContainer getContainer() { return container; }
+
   public boolean isCompleted() {
     return isDone;
   }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index b57329c..6d06434 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -43,8 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -111,7 +109,8 @@ public class HashPartitionTest {
           10,
           spillSet,
           0,
-          0);
+          0,
+          2); // only '1' has a special treatment
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new 
HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
@@ -206,7 +205,8 @@ public class HashPartitionTest {
           10,
           spillSet,
           0,
-          0);
+          0,
+          2);
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new 
HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index dc05a70..30c0c73 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index bd34df4..ed25d78 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
index 4944c87..4fe1fa4 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 37a6a33..52e6707 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -6,18 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.drill.exec.physical.impl.join;
 
+import ch.qos.logback.classic.Level;
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
@@ -25,7 +25,7 @@ import org.apache.drill.categories.SlowTest;
 
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
-import org.junit.Ignore;
+import org.apache.drill.test.LogFixture;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -33,10 +33,16 @@ import java.util.List;
 
 @Category({SlowTest.class, OperatorTest.class})
 public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
+
   @SuppressWarnings("unchecked")
   @Test
   // Should spill, including recursive spill
   public void testSimpleHashJoinSpill() {
+    LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+      .toConsole()
+      .logger("org.apache.drill", Level.WARN);
+
+
     HashJoinPOP joinConf = new HashJoinPOP(null, null,
       Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER);
     
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions",
 4);
@@ -53,6 +59,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase 
{
       rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
+    LogFixture logs = logBuilder.build();
     opTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
index 4e9f1c7..3f01bca 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
index 5cdf524..1bd51fc 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
index 40eec6a..627d737 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index 1e9bb8a..5cf7eca 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 0bf995a..36d004c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -230,6 +230,8 @@ public class MockLateralJoinBatch implements 
LateralContract, CloseableRecordBat
     return count;
   }
 
+  @Override
+  public VectorContainer getContainer() { return null; }
 
   @Override public Iterator<VectorWrapper<?>> iterator() {
     return null;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index b8f9563..9c2d5d8 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -27,8 +27,10 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import 
org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
 import 
org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.SubOperatorTest;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -36,6 +38,8 @@ import org.junit.experimental.categories.Category;
 public class TestExternalSortInternals extends SubOperatorTest {
 
   private static final int ONE_MEG = 1024 * 1024;
+  @Rule
+  public final BaseDirTestWatcher watcher = new BaseDirTestWatcher();
 
   /**
    * Verify defaults configured in drill-override.conf.
@@ -66,7 +70,7 @@ public class TestExternalSortInternals extends 
SubOperatorTest {
   @Test
   public void testConfigOverride() {
     // Verify the various HOCON ways of setting memory
-    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
     builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
@@ -92,7 +96,7 @@ public class TestExternalSortInternals extends 
SubOperatorTest {
    */
   @Test
   public void testConfigLimits() {
-    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
     builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 
SortConfig.MIN_MERGE_LIMIT - 1)
         .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, 
SortConfig.MIN_SPILL_FILE_SIZE - 1)
@@ -414,7 +418,7 @@ public class TestExternalSortInternals extends 
SubOperatorTest {
     int batchSizeConstraint = ONE_MEG / 2;
     int mergeSizeConstraint = ONE_MEG;
 
-    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
     builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstraint)
         .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstraint)
@@ -470,7 +474,7 @@ public class TestExternalSortInternals extends 
SubOperatorTest {
     // No artificial merge limit
 
     int mergeLimitConstraint = 100;
-    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
     builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
         .build();
@@ -599,7 +603,7 @@ public class TestExternalSortInternals extends 
SubOperatorTest {
   public void testMergeLimit() {
     // Constrain merge width
     int mergeLimitConstraint = 5;
-    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
     builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
         .build();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index b09b865..0a6d7f9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -49,7 +49,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Ignore;
-import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;

-- 
To stop receiving notification emails like this one, please contact
b...@apache.org.

Reply via email to