This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e9ffb5bde37efad9f0b646773311f23ea2dbda5d
Author: Timothy Farkas <[email protected]>
AuthorDate: Mon Jul 30 11:21:02 2018 -0700

    DRILL-6644: Don't reserve space for incoming probe batches unnecessarily 
during the build phase.
    
    git closes #1409
---
 .../exec/physical/impl/join/HashJoinBatch.java     |  23 ++--
 .../join/HashJoinMechanicalMemoryCalculator.java   |   1 -
 .../impl/join/HashJoinMemoryCalculator.java        |   3 +-
 .../impl/join/HashJoinMemoryCalculatorImpl.java    | 106 ++++++++---------
 .../impl/join/TestBuildSidePartitioningImpl.java   | 104 +++++++++++++----
 .../impl/join/TestPostBuildCalculationsImpl.java   | 127 +++++++++++++++++----
 .../exec/physical/unit/TestNullInputMiniPlan.java  |   4 +-
 7 files changed, 243 insertions(+), 125 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a8339f8..dd9b0d6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -287,7 +287,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       prefetchedBuild,
       buildSideIsEmpty,
       RIGHT_INDEX,
-      right,
+      buildBatch,
       () -> {
         batchMemoryManager.update(RIGHT_INDEX, 0, true);
         logger.debug("BATCH_STATS, incoming right: {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
@@ -302,7 +302,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       prefetchedProbe,
       probeSideIsEmpty,
       LEFT_INDEX,
-      left,
+      probeBatch,
       () -> {
         batchMemoryManager.update(LEFT_INDEX, 0);
         logger.debug("BATCH_STATS, incoming left: {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
@@ -348,7 +348,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       state = BatchState.STOP;
     } else {
       // Got our first batch(es)
-      memoryManagerUpdate.run();
+      if (cycleNum == 0) {
+        // Only collect stats for the first cylce
+        memoryManagerUpdate.run();
+      }
       state = BatchState.FIRST;
     }
 
@@ -558,6 +561,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
           state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
+          prefetchedBuild.setValue(false);
+          prefetchedProbe.setValue(false);
+
           return innerNext(); // start processing the next spilled partition 
"recursively"
         }
 
@@ -762,7 +768,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         buildJoinColumns,
         leftUpstream == IterOutcome.NONE, // probeEmpty
         allocator.getLimit(),
-        maxIncomingBatchSize,
         numPartitions,
         RECORDS_PER_BATCH,
         RECORDS_PER_BATCH,
@@ -821,22 +826,19 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT : 
RECORDS_PER_BATCH;
-      boolean hasProbeData = leftUpstream != IterOutcome.NONE;
-      boolean doMemoryCalculation = canSpill && hasProbeData;
+      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT: 
RECORDS_PER_BATCH;
+      boolean doMemoryCalculation = canSpill && 
!probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      // We've sniffed first non empty build and probe batches so we have 
enough information to create a calculator
       buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash 
values bug fixed
         buildBatch,
         probeBatch,
         buildJoinColumns,
-        leftUpstream == IterOutcome.NONE, // probeEmpty
+        probeSideIsEmpty.booleanValue(),
         allocator.getLimit(),
-        maxIncomingBatchSize,
         numPartitions,
         RECORDS_PER_BATCH,
         RECORDS_PER_BATCH,
@@ -1093,7 +1095,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
     this.allocator = oContext.getAllocator();
 
-    maxIncomingBatchSize = 
context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE);
     numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
     if ( numPartitions == 1 ) { //
       disableSpilling("Spilling is disabled due to configuration setting of 
num_partitions to 1");
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index af6be8b..59496d9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -73,7 +73,6 @@ public class HashJoinMechanicalMemoryCalculator implements 
HashJoinMemoryCalcula
                            Set<String> joinColumns,
                            boolean probeEmpty,
                            long memoryAvailable,
-                           long maxIncomingBatchSize,
                            int initialPartitions,
                            int recordsPerPartitionBatchBuild,
                            int recordsPerPartitionBatchProbe,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index 0ccd912..38c2a35 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -96,14 +96,13 @@ public interface HashJoinMemoryCalculator extends 
HashJoinStateCalculator<HashJo
    * </ul>
    */
   interface BuildSidePartitioning extends 
HashJoinStateCalculator<PostBuildCalculations> {
-    void initialize(boolean autoTune,
+    void initialize(boolean firstCycle,
                     boolean reserveHash,
                     RecordBatch buildSideBatch,
                     RecordBatch probeSideBatch,
                     Set<String> joinColumns,
                     boolean probeEmpty,
                     long memoryAvailable,
-                    long maxIncomingBatchSize,
                     int initialPartitions,
                     int recordsPerPartitionBatchBuild,
                     int recordsPerPartitionBatchProbe,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 2ab42e5..35ff7ec 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -93,14 +93,13 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     private int recordsPerPartitionBatchProbe;
 
     @Override
-    public void initialize(boolean autoTune,
+    public void initialize(boolean firstCycle,
                            boolean reserveHash,
                            RecordBatch buildSideBatch,
                            RecordBatch probeSideBatch,
                            Set<String> joinColumns,
                            boolean probeEmpty,
                            long memoryAvailable,
-                           long maxIncomingBatchSize,
                            int initialPartitions,
                            int recordsPerPartitionBatchBuild,
                            int recordsPerPartitionBatchProbe,
@@ -169,7 +168,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
    * <h1>Life Cycle</h1>
    * <p>
    *   <ul>
-   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, boolean, long, long, int, int, int, int, int, 
int, double)}.
+   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, 
double)}.
    *     This will initialize the StateCalculate with the additional 
information it needs.</li>
    *     <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number 
of partitions that fit in memory.</li>
    *     <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if 
spilling needs to occurr.</li>
@@ -190,7 +189,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     private int maxBatchNumRecordsProbe;
     private long memoryAvailable;
     private boolean probeEmpty;
-    private long maxIncomingBatchSize;
     private long maxBuildBatchSize;
     private long maxProbeBatchSize;
     private long maxOutputBatchSize;
@@ -200,7 +198,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     private int recordsPerPartitionBatchProbe;
     private int outputBatchSize;
     private Map<String, Long> keySizes;
-    private boolean autoTune;
+    private boolean firstCycle;
     private boolean reserveHash;
     private double loadFactor;
 
@@ -228,14 +226,13 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     }
 
     @Override
-    public void initialize(boolean autoTune,
+    public void initialize(boolean firstCycle,
                            boolean reserveHash,
                            RecordBatch buildBatch,
                            RecordBatch probeBatch,
                            Set<String> joinColumns,
                            boolean probeEmpty,
                            long memoryAvailable,
-                           long maxIncomingBatchSize,
                            int initialPartitions,
                            int recordsPerPartitionBatchBuild,
                            int recordsPerPartitionBatchProbe,
@@ -264,11 +261,10 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         keySizes.put(joinColumn, 
(long)columnSize.getStdNetOrNetSizePerEntry());
       }
 
-      initialize(autoTune,
+      initialize(firstCycle,
         reserveHash,
         keySizes,
         memoryAvailable,
-        maxIncomingBatchSize,
         initialPartitions,
         probeEmpty,
         buildSizePredictor,
@@ -282,11 +278,10 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     }
 
     @VisibleForTesting
-    protected void initialize(boolean autoTune,
+    protected void initialize(boolean firstCycle,
                               boolean reserveHash,
                               CaseInsensitiveMap<Long> keySizes,
                               long memoryAvailable,
-                              long maxIncomingBatchSize,
                               int initialPartitions,
                               boolean probeEmpty,
                               BatchSizePredictor buildSizePredictor,
@@ -305,12 +300,11 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       firstInitialized = true;
 
       this.loadFactor = loadFactor;
-      this.autoTune = autoTune;
+      this.firstCycle = firstCycle;
       this.reserveHash = reserveHash;
       this.keySizes = Preconditions.checkNotNull(keySizes);
       this.memoryAvailable = memoryAvailable;
       this.probeEmpty = probeEmpty;
-      this.maxIncomingBatchSize = maxIncomingBatchSize;
       this.buildSizePredictor = buildSizePredictor;
       this.probeSizePredictor = probeSizePredictor;
       this.initialPartitions = initialPartitions;
@@ -358,22 +352,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     {
       // Adjust based on number of records
       maxBuildBatchSize = 
buildSizePredictor.predictBatchSize(maxBatchNumRecordsBuild, false);
-
-      if (probeSizePredictor.hadDataLastTime()) {
-        // We have probe data and we can compute the max incoming size.
-        maxProbeBatchSize = 
probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
-      } else {
-        // We don't have probe data
-        if (probeEmpty) {
-          // We know the probe has no data, so we don't need to reserve any 
space for the incoming probe
-          maxProbeBatchSize = 0;
-        } else {
-          // The probe side may have data, so assume it is the max incoming 
batch size. This assumption
-          // can fail in some cases since the batch sizing project is 
incomplete.
-          maxProbeBatchSize = maxIncomingBatchSize;
-        }
-      }
-
       partitionBuildBatchSize = 
buildSizePredictor.predictBatchSize(recordsPerPartitionBatchBuild, reserveHash);
 
       if (probeSizePredictor.hadDataLastTime()) {
@@ -389,15 +367,18 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         long incompletePartitionsBatchSizes = ((long) partitions) * 
partitionBuildBatchSize;
         // We need to reserve all the space for incomplete batches, and the 
incoming batch as well as the
         // probe batch we sniffed.
-        // TODO when batch sizing project is complete we won't have to sniff 
probe batches since
-        // they will have a well defined size.
-        reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize + 
maxProbeBatchSize;
+        reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize;
+
+        if (!firstCycle) {
+          // If this is NOT the first cycle the HashJoin operator owns the 
probe batch and we need to reserve space for it.
+          reservedMemory += probeSizePredictor.getBatchSize();
+        }
 
         if (probeSizePredictor.hadDataLastTime()) {
           // If we have probe data, use it in our memory reservation 
calculations.
           probeReservedMemory = 
PostBuildCalculationsImpl.calculateReservedMemory(
             partitions,
-            maxProbeBatchSize,
+            probeSizePredictor.getBatchSize(),
             maxOutputBatchSize,
             partitionProbeBatchSize);
 
@@ -407,7 +388,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
           maxReservedMemory = reservedMemory;
         }
 
-        if (!autoTune || maxReservedMemory <= memoryAvailable) {
+        if (!firstCycle || maxReservedMemory <= memoryAvailable) {
           // Stop the tuning loop if we are not doing auto tuning, or if we 
are living within our memory limit
           break;
         }
@@ -476,6 +457,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       Preconditions.checkState(initialized);
 
       return new PostBuildCalculationsImpl(
+        firstCycle,
         probeSizePredictor,
         memoryAvailable,
         maxOutputBatchSize,
@@ -580,10 +562,10 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
 
     public static final int MIN_RECORDS_PER_PARTITION_BATCH_PROBE = 10;
 
+    private final boolean firstCycle;
     private final BatchSizePredictor probeSizePredictor;
     private final long memoryAvailable;
     private final long maxOutputBatchSize;
-    private final int maxBatchNumRecordsProbe;
     private final int recordsPerPartitionBatchProbe;
     private final PartitionStatSet buildPartitionStatSet;
     private final Map<String, Long> keySizes;
@@ -597,24 +579,25 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     private boolean initialized;
     private long consumedMemory;
     private boolean probeEmpty;
-    private long maxProbeBatchSize;
     private long partitionProbeBatchSize;
     private int computedProbeRecordsPerBatch;
 
     @VisibleForTesting
-    public PostBuildCalculationsImpl(final BatchSizePredictor 
probeSizePredictor,
-                                      final long memoryAvailable,
-                                      final long maxOutputBatchSize,
-                                      final int maxBatchNumRecordsProbe,
-                                      final int recordsPerPartitionBatchProbe,
-                                      final PartitionStatSet 
buildPartitionStatSet,
-                                      final Map<String, Long> keySizes,
-                                      final HashTableSizeCalculator 
hashTableSizeCalculator,
-                                      final HashJoinHelperSizeCalculator 
hashJoinHelperSizeCalculator,
-                                      final double fragmentationFactor,
-                                      final double safetyFactor,
-                                      final double loadFactor,
-                                      final boolean reserveHash) {
+    public PostBuildCalculationsImpl(final boolean firstCycle,
+                                     final BatchSizePredictor 
probeSizePredictor,
+                                     final long memoryAvailable,
+                                     final long maxOutputBatchSize,
+                                     final int maxBatchNumRecordsProbe,
+                                     final int recordsPerPartitionBatchProbe,
+                                     final PartitionStatSet 
buildPartitionStatSet,
+                                     final Map<String, Long> keySizes,
+                                     final HashTableSizeCalculator 
hashTableSizeCalculator,
+                                     final HashJoinHelperSizeCalculator 
hashJoinHelperSizeCalculator,
+                                     final double fragmentationFactor,
+                                     final double safetyFactor,
+                                     final double loadFactor,
+                                     final boolean reserveHash) {
+      this.firstCycle = firstCycle;
       this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor);
       this.memoryAvailable = memoryAvailable;
       this.maxOutputBatchSize = maxOutputBatchSize;
@@ -626,7 +609,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       this.safetyFactor = safetyFactor;
       this.loadFactor = loadFactor;
       this.reserveHash = reserveHash;
-      this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
       this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
       this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe;
     }
@@ -636,7 +618,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       Preconditions.checkState(!initialized);
       // If we had probe data before there should still be probe data now.
       // If we didn't have probe data before we could get some new data now.
-      Preconditions.checkState(probeSizePredictor.hadDataLastTime() && 
!probeEmpty || !probeSizePredictor.hadDataLastTime());
+      Preconditions.checkState(!(probeEmpty && 
probeSizePredictor.hadDataLastTime()));
       initialized = true;
       this.probeEmpty = probeEmpty;
 
@@ -650,12 +632,11 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         probeSizePredictor.updateStats();
       }
 
-      maxProbeBatchSize = 
probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
       partitionProbeBatchSize = 
probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash);
 
       long worstCaseProbeMemory = calculateReservedMemory(
         buildPartitionStatSet.getSize(),
-        maxProbeBatchSize,
+        getIncomingProbeBatchReservedSpace(),
         maxOutputBatchSize,
         partitionProbeBatchSize);
 
@@ -667,7 +648,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
           buildPartitionStatSet.getSize(),
           recordsPerPartitionBatchProbe,
           MIN_RECORDS_PER_PARTITION_BATCH_PROBE,
-          maxProbeBatchSize,
+          getIncomingProbeBatchReservedSpace(),
           maxOutputBatchSize,
           partitionProbeBatchSize);
 
@@ -681,9 +662,14 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       return computedProbeRecordsPerBatch;
     }
 
-    @VisibleForTesting
-    public long getMaxProbeBatchSize() {
-      return maxProbeBatchSize;
+    public long getIncomingProbeBatchReservedSpace() {
+      Preconditions.checkState(initialized);
+
+      if (firstCycle) {
+        return 0;
+      } else {
+        return probeSizePredictor.getBatchSize();
+      }
     }
 
     @VisibleForTesting
@@ -744,7 +730,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
 
       long reservedMemory = calculateReservedMemory(
         buildPartitionStatSet.getNumSpilledPartitions(),
-        maxProbeBatchSize,
+        getIncomingProbeBatchReservedSpace(),
         maxOutputBatchSize,
         partitionProbeBatchSize);
 
@@ -804,11 +790,11 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         "Mem calc stats:\n" +
         "memoryLimit = %s\n" +
         "consumedMemory = %s\n" +
-        "maxProbeBatchSize = %s\n" +
+        "maxIncomingProbeBatchReservedSpace = %s\n" +
         "maxOutputBatchSize = %s\n",
         PartitionStatSet.prettyPrintBytes(memoryAvailable),
         PartitionStatSet.prettyPrintBytes(consumedMemory),
-        PartitionStatSet.prettyPrintBytes(maxProbeBatchSize),
+        
PartitionStatSet.prettyPrintBytes(getIncomingProbeBatchReservedSpace()),
         PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
 
       StringBuilder hashJoinHelperSb = new StringBuilder("Partition Hash Join 
Helpers\n");
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index af943ec..665bc48 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -25,7 +25,16 @@ import org.junit.Test;
 
 public class TestBuildSidePartitioningImpl {
   @Test
-  public void testSimpleReserveMemoryCalculationNoHash() {
+  public void testSimpleReserveMemoryCalculationNoHashFirstCycle() {
+    testSimpleReserveMemoryCalculationNoHashHelper(true);
+  }
+
+  @Test
+  public void testSimpleReserveMemoryCalculationNoHashNotFirstCycle() {
+    testSimpleReserveMemoryCalculationNoHashHelper(false);
+  }
+
+  private void testSimpleReserveMemoryCalculationNoHashHelper(final boolean 
firstCycle) {
     final int maxBatchNumRecords = 20;
     final double fragmentationFactor = 2.0;
     final double safetyFactor = 1.5;
@@ -39,12 +48,12 @@ public class TestBuildSidePartitioningImpl {
         safetyFactor);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+    final long accountedProbeBatchSize = firstCycle? 0: 10;
 
-    calc.initialize(true,
+    calc.initialize(firstCycle,
       false,
       keySizes,
-      200,
-      100,
+      190 + accountedProbeBatchSize,
       2,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -62,7 +71,7 @@ public class TestBuildSidePartitioningImpl {
 
     long expectedReservedMemory = 60 // Max incoming batch size
       + 2 * 30 // build side batch for each spilled partition
-      + 60; // Max incoming probe batch size
+      + accountedProbeBatchSize; // Max incoming probe batch size
     long actualReservedMemory = calc.getBuildReservedMemory();
 
     Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -89,7 +98,6 @@ public class TestBuildSidePartitioningImpl {
       true,
       keySizes,
       350,
-      100, // Ignored for test
       2,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -107,7 +115,7 @@ public class TestBuildSidePartitioningImpl {
 
     long expectedReservedMemory = 60 // Max incoming batch size
       + 2 * (/* data size for batch */ 30 + /* Space reserved for hash value 
vector */ 10 * 4 * 2) // build side batch for each spilled partition
-      + 60; // Max incoming probe batch size
+      + 10; // Max incoming probe batch size
     long actualReservedMemory = calc.getBuildReservedMemory();
 
     Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -135,7 +143,6 @@ public class TestBuildSidePartitioningImpl {
       false,
       keySizes,
       200,
-      100, // Ignored for test
       4,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -153,17 +160,61 @@ public class TestBuildSidePartitioningImpl {
     calc.setPartitionStatSet(partitionStatSet);
 
     long expectedReservedMemory = 60 // Max incoming batch size
-      + 2 * 30 // build side batch for each spilled partition
-      + 60; // Max incoming probe batch size
+      + 2 * 30; // build side batch for each spilled partition
     long actualReservedMemory = calc.getBuildReservedMemory();
 
     Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
     Assert.assertEquals(2, calc.getNumPartitions());
   }
 
+  @Test
+  public void testDontAdjustInitialPartitions() {
+    final int maxBatchNumRecords = 20;
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        BatchSizePredictorImpl.Factory.INSTANCE,
+        new 
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, 
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        fragmentationFactor,
+        safetyFactor);
+
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(
+      false,
+      false,
+      keySizes,
+      200,
+      4,
+      false,
+      new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
+      new MockBatchSizePredictor(10, 10, fragmentationFactor, safetyFactor),
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      16000,
+      .75);
+
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), 
new PartitionStatImpl(),
+        new PartitionStatImpl(), new PartitionStatImpl());
+    calc.setPartitionStatSet(partitionStatSet);
+
+    long expectedReservedMemory = 60 // Max incoming batch size
+      + 4 * 30 // build side batch for each spilled partition
+      + 10; // Max incoming probe batch size
+    long actualReservedMemory = calc.getBuildReservedMemory();
+
+    Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+    Assert.assertEquals(4, calc.getNumPartitions());
+  }
+
   @Test(expected = IllegalStateException.class)
   public void testHasDataProbeEmpty() {
-    final int maxIncomingBatchSize = 100;
     final int maxBatchNumRecords = 20;
     final double fragmentationFactor = 2.0;
     final double safetyFactor = 1.5;
@@ -183,7 +234,6 @@ public class TestBuildSidePartitioningImpl {
       false,
       keySizes,
       240,
-      maxIncomingBatchSize,
       4,
       true,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -198,7 +248,6 @@ public class TestBuildSidePartitioningImpl {
 
   @Test
   public void testNoProbeDataForStats() {
-    final int maxIncomingBatchSize = 100;
     final int maxBatchNumRecords = 20;
     final double fragmentationFactor = 2.0;
     final double safetyFactor = 1.5;
@@ -218,7 +267,6 @@ public class TestBuildSidePartitioningImpl {
       false,
       keySizes,
       240,
-      maxIncomingBatchSize,
       4,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -235,12 +283,11 @@ public class TestBuildSidePartitioningImpl {
     calc.setPartitionStatSet(partitionStatSet);
 
     long expectedReservedMemory = 60 // Max incoming batch size
-      + 2 * 30 // build side batch for each spilled partition
-      + maxIncomingBatchSize;
+      + 4 * 30; // build side batch for each spilled partition
     long actualReservedMemory = calc.getBuildReservedMemory();
 
     Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
-    Assert.assertEquals(2, calc.getNumPartitions());
+    Assert.assertEquals(4, calc.getNumPartitions());
   }
 
   @Test
@@ -264,7 +311,6 @@ public class TestBuildSidePartitioningImpl {
       false,
       keySizes,
       200,
-      100, // Ignored for test
       4,
       true,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -290,10 +336,20 @@ public class TestBuildSidePartitioningImpl {
   }
 
   @Test
-  public void testNoRoomInMemoryForBatch1() {
+  public void testNoRoomInMemoryForBatch1FirstCycle() {
+    testNoRoomInMemoryForBatch1Helper(true);
+  }
+
+  @Test
+  public void testNoRoomInMemoryForBatch1NotFirstCycle() {
+    testNoRoomInMemoryForBatch1Helper(false);
+  }
+
+  private void testNoRoomInMemoryForBatch1Helper(final boolean firstCycle) {
     final int maxBatchNumRecords = 20;
     final double fragmentationFactor = 2.0;
     final double safetyFactor = 1.5;
+    final long accountedProbeBatchSize = firstCycle? 0: 10;
 
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
@@ -306,11 +362,10 @@ public class TestBuildSidePartitioningImpl {
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
     calc.initialize(
-      true,
+      firstCycle,
       false,
       keySizes,
-      180,
-      100, // Ignored for test
+      120 + accountedProbeBatchSize,
       2,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
@@ -330,7 +385,7 @@ public class TestBuildSidePartitioningImpl {
 
     long expectedReservedMemory = 60 // Max incoming batch size
       + 2 * 30 // build side batch for each spilled partition
-      + 60; // Max incoming probe batch size
+      + accountedProbeBatchSize; // Max incoming probe batch size
     long actualReservedMemory = calc.getBuildReservedMemory();
 
     Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
@@ -360,8 +415,7 @@ public class TestBuildSidePartitioningImpl {
       true,
       false,
       keySizes,
-      210,
-      100, // Ignored for test
+      160,
       2,
       false,
       new MockBatchSizePredictor(20, 20, fragmentationFactor, safetyFactor),
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index aa7a435..fa3fdfb 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -121,6 +121,7 @@ public class TestPostBuildCalculationsImpl {
 
     final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        true,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
@@ -167,6 +168,7 @@ public class TestPostBuildCalculationsImpl {
 
     final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        true,
         new ConditionalMockBatchSizePredictor(),
         50,
         1000,
@@ -188,7 +190,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testHasNoProbeDataButProbeNonEmpty() {
+  public void testHasNoProbeDataButProbeNonEmptyFirstCycle() {
+    testHasNoProbeDataButProbeNonEmptyHelper(true);
+  }
+
+  @Test
+  public void testHasNoProbeDataButProbeNonEmptyNotFirstCycle() {
+    testHasNoProbeDataButProbeNonEmptyHelper(false);
+  }
+
+  private void testHasNoProbeDataButProbeNonEmptyHelper(final boolean 
firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -210,14 +221,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           false),
-        290,
+        230 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -232,7 +245,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize
       + 160 // in memory partitions
       + 20 // max output batch size
       + 2 * 10 // Hash Table
@@ -243,7 +256,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildAllInMemoryNoSpill() {
+  public void testProbingAndPartitioningBuildAllInMemoryNoSpillFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(true);
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemoryNoSpillNotFirstCycle() 
{
+    testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(false);
+  }
+
+  private void testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(final 
boolean firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -265,14 +287,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     final HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        290,
+        230 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -287,7 +311,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize
       + 160 // in memory partitions
       + 20 // max output batch size
       + 2 * 10 // Hash Table
@@ -298,7 +322,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildAllInMemorySpill() {
+  public void testProbingAndPartitioningBuildAllInMemorySpillFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemorySpillHelper(true);
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemorySpillNotFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemorySpillHelper(false);
+  }
+
+  private void testProbingAndPartitioningBuildAllInMemorySpillHelper(final 
boolean firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -320,16 +353,18 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        270,
+        210 + accountedProbeBatchSize,
         20,
-         maxBatchNumRecordsProbe,
+        maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
         buildPartitionStatSet,
         keySizes,
@@ -342,7 +377,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize
       + 160 // in memory partitions
       + 20 // max output batch size
       + 2 * 10 // Hash Table
@@ -351,7 +386,7 @@ public class TestPostBuildCalculationsImpl {
     Assert.assertEquals(expected, calc.getConsumedMemory());
     partition1.spill();
 
-    expected = 60 // maxProbeBatchSize
+    expected = accountedProbeBatchSize
       + 80 // in memory partitions
       + 20 // max output batch size
       + 10 // Hash Table
@@ -363,7 +398,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
+  public void 
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(true);
+  }
+
+  @Test
+  public void 
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashNotFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(false);
+  }
+
+  private void 
testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(final boolean 
firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -381,14 +425,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        180,
+        120 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -403,7 +449,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize // probe batch
       + 2 * 5 * 3 // partition batches
       + 20; // max output batch size
     Assert.assertFalse(calc.shouldSpill());
@@ -412,7 +458,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildAllInMemoryWithSpill() {
+  public void testProbingAndPartitioningBuildAllInMemoryWithSpillFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(true);
+  }
+
+  @Test
+  public void 
testProbingAndPartitioningBuildAllInMemoryWithSpillNotFirstCycle() {
+    testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(false);
+  }
+
+  private void testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(final 
boolean firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -434,14 +489,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        200,
+        140 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -456,7 +513,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize
       + 80 // in memory partition
       + 10 // hash table size
       + 10 // hash join helper size
@@ -471,7 +528,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildSomeInMemory() {
+  public void testProbingAndPartitioningBuildSomeInMemoryFirstCycle() {
+    testProbingAndPartitioningBuildSomeInMemoryHelper(true);
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildSomeInMemoryNotFirstCycle() {
+    testProbingAndPartitioningBuildSomeInMemoryHelper(false);
+  }
+
+  private void testProbingAndPartitioningBuildSomeInMemoryHelper(final boolean 
firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -497,14 +563,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        230,
+        170 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -519,7 +587,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
 
-    long expected = 60 // maxProbeBatchSize
+    long expected = accountedProbeBatchSize
       + 80 // in memory partition
       + 10 // hash table size
       + 10 // hash join helper size
@@ -533,8 +601,16 @@ public class TestPostBuildCalculationsImpl {
   }
 
   @Test
-  public void testProbingAndPartitioningBuildNoneInMemory() {
+  public void testProbingAndPartitioningBuildNoneInMemoryFirstCycle() {
+    testProbingAndPartitioningBuildNoneInMemoryHelper(true);
+  }
 
+  @Test
+  public void testProbingAndPartitioningBuildNoneInMemoryNotFirstCycle() {
+    testProbingAndPartitioningBuildNoneInMemoryHelper(false);
+  }
+
+  private void testProbingAndPartitioningBuildNoneInMemoryHelper(final boolean 
firstCycle) {
     final Map<String, Long> keySizes = 
org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -554,14 +630,16 @@ public class TestPostBuildCalculationsImpl {
     final int recordsPerPartitionBatchProbe = 5;
     final long partitionProbeBatchSize = 15;
     final long maxProbeBatchSize = 60;
+    final long accountedProbeBatchSize = (firstCycle? 0: maxProbeBatchSize);
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        firstCycle,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
           true),
-        110,
+        110 + accountedProbeBatchSize,
         20,
         maxBatchNumRecordsProbe,
         recordsPerPartitionBatchProbe,
@@ -576,7 +654,7 @@ public class TestPostBuildCalculationsImpl {
 
     calc.initialize(false);
     Assert.assertFalse(calc.shouldSpill());
-    Assert.assertEquals(110, calc.getConsumedMemory());
+    Assert.assertEquals(50 + accountedProbeBatchSize, 
calc.getConsumedMemory());
     Assert.assertNotNull(calc.next());
   }
 
@@ -611,6 +689,7 @@ public class TestPostBuildCalculationsImpl {
 
     HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
       new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        true,
         new ConditionalMockBatchSizePredictor(
           Lists.newArrayList(maxBatchNumRecordsProbe, 
recordsPerPartitionBatchProbe),
           Lists.newArrayList(maxProbeBatchSize, partitionProbeBatchSize),
@@ -708,7 +787,7 @@ public class TestPostBuildCalculationsImpl {
 
     @Override
     public long getBatchSize() {
-      return 0;
+      return batchSize.get(0);
     }
 
     @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
index 9ad7a9b..e1609a5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
@@ -316,7 +316,7 @@ public class TestNullInputMiniPlan extends 
MiniPlanUnitTestBase{
         .build();
 
     RecordBatch joinBatch = new PopBuilder()
-        .physicalOperator(new HashJoinPOP(null, null, 
Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.INNER, null))
+        .physicalOperator(new HashJoinPOP(null, null, 
Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.INNER, null))
         .addInput(left)
         .addInput(rightScan)
         .build();
@@ -379,7 +379,7 @@ public class TestNullInputMiniPlan extends 
MiniPlanUnitTestBase{
         .build();
 
     RecordBatch joinBatch = new PopBuilder()
-        .physicalOperator(new HashJoinPOP(null, null, 
Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.LEFT, null))
+        .physicalOperator(new HashJoinPOP(null, null, 
Lists.newArrayList(joinCond("a2", "EQUALS", "a")), JoinRelType.LEFT, null))
         .addInput(left)
         .addInput(rightScan)
         .build();

Reply via email to