This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 480ade960767cf138de184c7873792e96b0e9a9f
Author: Padma Penumarthy <[email protected]>
AuthorDate: Wed May 30 14:00:16 2018 -0700

    DRILL-6236:Batch sizing for hash join
    
    This closes #1227
---
 .../exec/physical/impl/join/HashJoinBatch.java     | 125 ++++---
 .../join/HashJoinMechanicalMemoryCalculator.java   |   1 +
 .../impl/join/HashJoinMemoryCalculator.java        |   1 +
 .../impl/join/HashJoinMemoryCalculatorImpl.java    |  35 +-
 .../exec/physical/impl/join/HashJoinProbe.java     |   2 +
 .../physical/impl/join/HashJoinProbeTemplate.java  |  22 +-
 .../exec/record/AbstractBinaryRecordBatch.java     |   4 +
 .../drill/exec/record/JoinBatchMemoryManager.java  |  61 ++--
 .../exec/record/RecordBatchMemoryManager.java      |  26 +-
 .../apache/drill/exec/record/RecordBatchSizer.java |  49 ++-
 .../impl/join/TestBuildSidePartitioningImpl.java   |  20 +-
 .../exec/physical/unit/TestOutputBatchSize.java    | 386 +++++++++++++++++++++
 12 files changed, 603 insertions(+), 129 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index ee7a8a3..4267077 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -57,16 +57,19 @@ import 
org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
+
 /**
  *   This class implements the runtime execution for the Hash-Join operator
  *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -95,11 +98,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
    */
   private int RECORDS_PER_BATCH; // internal batches
 
-  /**
-   * The maximum number of records in each outgoing batch.
-   */
-  private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
 
@@ -172,7 +170,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     public String outerSpillFile;
     int cycleNum;
     int origPartn;
-    int prevOrigPartn; }
+    int prevOrigPartn;
+  }
 
   /**
    * Queue of spilled partitions to process.
@@ -181,7 +180,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
   private HJSpilledPartition spilledInners[]; // for the outer to find the 
partition
 
   public enum Metric implements MetricDef {
-
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
@@ -190,8 +188,19 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     SPILLED_PARTITIONS, // number of original partitions spilled to disk
     SPILL_MB,         // Number of MB of data spilled to disk. This amount is 
first written,
                       // then later re-read. So, disk I/O is twice this amount.
-    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
-    ;
+    SPILL_CYCLE,       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    LEFT_INPUT_BATCH_COUNT,
+    LEFT_AVG_INPUT_BATCH_BYTES,
+    LEFT_AVG_INPUT_ROW_BYTES,
+    LEFT_INPUT_RECORD_COUNT,
+    RIGHT_INPUT_BATCH_COUNT,
+    RIGHT_AVG_INPUT_BATCH_BYTES,
+    RIGHT_AVG_INPUT_ROW_BYTES,
+    RIGHT_INPUT_RECORD_COUNT,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    OUTPUT_RECORD_COUNT;
 
     // duplicate for hash ag
 
@@ -221,12 +230,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       throw new SchemaChangeException(e);
     }
 
-    // Build the container schema and set the counts
-    for (final VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    container.setRecordCount(outputRecords);
   }
 
   @Override
@@ -234,6 +238,15 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     leftUpstream = sniffNonEmptyBatch(0, left);
     rightUpstream = sniffNonEmptyBatch(1, right);
 
+    // For build side, use aggregate i.e. average row width across batches
+    batchMemoryManager.update(LEFT_INDEX, 0);
+    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming left:\n {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right:\n {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+    }
+
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
       state = BatchState.STOP;
       return false;
@@ -333,10 +346,21 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
            joinType != JoinRelType.INNER) {  // or if this is a left/full 
outer join
 
         // Allocate the memory for the vectors in the output container
-        allocateVectors();
+        batchMemoryManager.allocateVectors(container);
+        
hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
         outputRecords = hashJoinProbe.probeAndProject();
 
+        for (final VectorWrapper<?> v : container) {
+          v.getValueVector().getMutator().setValueCount(outputRecords);
+        }
+        container.setRecordCount(outputRecords);
+
+        batchMemoryManager.updateOutgoingStats(outputRecords);
+        if (logger.isDebugEnabled()) {
+          logger.debug("BATCH_STATS, outgoing:\n {}", new 
RecordBatchSizer(this));
+        }
+
         /* We are here because of one the following
          * 1. Completed processing of all the records and we are done
          * 2. We've filled up the outgoing batch to the maximum and we need to 
return upstream
@@ -347,10 +371,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
             state = BatchState.NOT_FIRST;
           }
 
-          for (final VectorWrapper<?> v : container) {
-            v.getValueVector().getMutator().setValueCount(outputRecords);
-          }
-
           return IterOutcome.OK;
         }
 
@@ -557,7 +577,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         RECORDS_PER_BATCH,
         maxBatchSize,
         maxBatchSize,
-        TARGET_RECORDS_PER_BATCH,
+        batchMemoryManager.getOutputRowCount(),
+        batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
       disableSpilling(null);
@@ -628,7 +649,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         RECORDS_PER_BATCH,
         maxBatchSize,
         maxBatchSize,
-        TARGET_RECORDS_PER_BATCH,
+        batchMemoryManager.getOutputRowCount(),
+        batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
       if (firstCycle && doMemoryCalculation) {
@@ -665,6 +687,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         for (HashPartition partn : partitions) { partn.updateBatches(); }
         // Fall through
       case OK:
+        batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
         // Special treatment (when no spill, and single partition) -- use the 
incoming vectors as they are (no row copy)
         if ( numPartitions == 1 ) {
           partitions[0].appendBatch(buildBatch);
@@ -803,22 +826,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   }
 
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vectorWrapper : container) {
-      ValueVector valueVector = vectorWrapper.getValueVector();
-
-      if (valueVector instanceof FixedWidthVector) {
-        ((FixedWidthVector) valueVector).allocateNew(TARGET_RECORDS_PER_BATCH);
-      } else if (valueVector instanceof VariableWidthVector) {
-        ((VariableWidthVector) valueVector).allocateNew(8 * 
TARGET_RECORDS_PER_BATCH, TARGET_RECORDS_PER_BATCH);
-      } else {
-        valueVector.allocateNew();
-      }
-    }
-
-    container.setRecordCount(0); // reset container's counter back to zero 
records
-  }
-
   // (After the inner side was read whole) - Has that inner partition spilled
   public boolean isSpilledInner(int part) {
     if ( spilledInners == null ) { return false; } // empty inner
@@ -879,6 +886,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
     // Create empty partitions (in the ctor - covers the case where right side 
is empty)
     partitions = new HashPartition[0];
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, 
right);
   }
 
   /**
@@ -966,6 +977,23 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     buildBatch.kill(sendUpstream);
   }
 
+  public void updateMetrics() {
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, 
batchMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, 
batchMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, 
batchMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, 
batchMemoryManager.getTotalOutputRecords());
+  }
+
   @Override
   public void close() {
     if ( cycleNum > 0 ) { // spilling happened
@@ -973,6 +1001,25 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       // SpilledRecordBatch "scanners" as it only knows about the original 
left/right ops.
       killIncoming(false);
     }
+
+    updateMetrics();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     this.cleanup();
     super.close();
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index 618e80e..fb087a0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -77,6 +77,7 @@ public class HashJoinMechanicalMemoryCalculator implements 
HashJoinMemoryCalcula
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       this.initialPartitions = initialPartitions;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index 71292a5..868fbfd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -100,6 +100,7 @@ public interface HashJoinMemoryCalculator extends 
HashJoinStateCalculator<HashJo
                     int maxBatchNumRecordsBuild,
                     int maxBatchNumRecordsProbe,
                     int outputBatchNumRecords,
+                    int outputBatchSize,
                     double loadFactor);
 
     void setPartitionStatSet(PartitionStatSet partitionStatSet);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index ed0adc5..37f3329 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -142,6 +142,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       this.initialPartitions = initialPartitions;
     }
@@ -203,7 +204,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
    * <h1>Life Cycle</h1>
    * <p>
    *   <ul>
-   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, double)}.
+   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, int, 
double)}.
    *     This will initialize the StateCalculate with the additional 
information it needs.</li>
    *     <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number 
of partitions that fit in memory.</li>
    *     <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if 
spilling needs to occurr.</li>
@@ -233,9 +234,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     private int partitions;
     private int recordsPerPartitionBatchBuild;
     private int recordsPerPartitionBatchProbe;
-    private int outputBatchNumRecords;
-    private Map<String, Long> buildValueSizes;
-    private Map<String, Long> probeValueSizes;
+    private int outputBatchSize;
     private Map<String, Long> keySizes;
     private boolean autoTune;
     private boolean reserveHash;
@@ -273,6 +272,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       Preconditions.checkNotNull(buildSideBatch);
       Preconditions.checkNotNull(probeSideBatch);
@@ -300,8 +300,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
 
       initialize(autoTune,
         reserveHash,
-        buildValueSizes,
-        probeValueSizes,
         keySizes,
         memoryAvailable,
         initialPartitions,
@@ -313,7 +311,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         recordsPerPartitionBatchProbe,
         maxBatchNumRecordsBuild,
         maxBatchNumRecordsProbe,
-        outputBatchNumRecords,
+        outputBatchSize,
         loadFactor);
     }
 
@@ -352,8 +350,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
     @VisibleForTesting
     protected void initialize(boolean autoTune,
                               boolean reserveHash,
-                              CaseInsensitiveMap<Long> buildValueSizes,
-                              CaseInsensitiveMap<Long> probeValueSizes,
                               CaseInsensitiveMap<Long> keySizes,
                               long memoryAvailable,
                               int initialPartitions,
@@ -365,7 +361,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
                               int recordsPerPartitionBatchProbe,
                               int maxBatchNumRecordsBuild,
                               int maxBatchNumRecordsProbe,
-                              int outputBatchNumRecords,
+                              int outputBatchSize,
                               double loadFactor) {
       Preconditions.checkState(!firstInitialized);
       Preconditions.checkArgument(initialPartitions >= 1);
@@ -374,8 +370,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       this.loadFactor = loadFactor;
       this.autoTune = autoTune;
       this.reserveHash = reserveHash;
-      this.buildValueSizes = Preconditions.checkNotNull(buildValueSizes);
-      this.probeValueSizes = Preconditions.checkNotNull(probeValueSizes);
       this.keySizes = Preconditions.checkNotNull(keySizes);
       this.memoryAvailable = memoryAvailable;
       this.buildBatchSize = buildBatchSize;
@@ -387,7 +381,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
       this.maxBatchNumRecordsBuild = maxBatchNumRecordsBuild;
       this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
-      this.outputBatchNumRecords = outputBatchNumRecords;
+      this.outputBatchSize = outputBatchSize;
 
       calculateMemoryUsage();
 
@@ -448,8 +442,7 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
         safetyFactor,
         reserveHash);
 
-      maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, 
probeValueSizes, keySizes,
-        outputBatchNumRecords, safetyFactor, fragmentationFactor);
+      maxOutputBatchSize = (long) ((double)outputBatchSize * 
fragmentationFactor * safetyFactor);
 
       long probeReservedMemory;
 
@@ -519,18 +512,6 @@ public class HashJoinMemoryCalculatorImpl implements 
HashJoinMemoryCalculator {
       }
     }
 
-    public static long computeMaxOutputBatchSize(Map<String, Long> 
buildValueSizes,
-                                                 Map<String, Long> 
probeValueSizes,
-                                                 Map<String, Long> keySizes,
-                                                 int outputNumRecords,
-                                                 double safetyFactor,
-                                                 double fragmentationFactor) {
-      long outputSize = 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(keySizes, 
outputNumRecords, safetyFactor)
-        + 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(buildValueSizes, 
outputNumRecords, safetyFactor)
-        + 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(probeValueSizes, 
outputNumRecords, safetyFactor);
-      return RecordBatchSizer.multiplyByFactor(outputSize, 
fragmentationFactor);
-    }
-
     @Override
     public boolean shouldSpill() {
       Preconditions.checkState(initialized);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index f212605..5059b18 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -42,4 +42,6 @@ public interface HashJoinProbe {
   void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
+  void setTargetOutputCount(int targetOutputCount);
+  int getOutputCount();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 75c3073..46f2fa3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -31,6 +31,8 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
 public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
   VectorContainer container; // the outgoing container
@@ -45,8 +47,6 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
   private HashJoinBatch outgoingJoinBatch = null;
 
-  private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
   // Number of records to process on the probe side
   private int recordsToProcess = 0;
 
@@ -83,6 +83,16 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
   private int partitionMask = 0; // numPartitions - 1
   private int bitsInMask = 0; // number of bits in the MASK
   private int rightHVColPosition;
+  private int targetOutputRecords;
+
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
+  public int getOutputCount() {
+    return outputRecords;
+  }
 
   /**
    *  Setup the Hash Join Probe object
@@ -209,7 +219,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
 
   private void executeProjectRightPhase(int currBuildPart) {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
+    while (outputRecords < targetOutputRecords && recordsProcessed < 
recordsToProcess) {
       outputRecords =
         outputRow(partitions[currBuildPart].getContainers(), 
unmatchedBuildIndexes.get(recordsProcessed),
           null /* no probeBatch */, 0 /* no probe index */ );
@@ -219,7 +229,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
   private void executeProbePhase() throws SchemaChangeException {
 
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+    while (outputRecords < targetOutputRecords && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
 
       // Check if we have processed all records in this batch we need to 
invoke next
       if (recordsProcessed == recordsToProcess) {
@@ -262,6 +272,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
                 probeBatch.getSchema());
             }
           case OK:
+            
setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch,
 LEFT_INDEX,outputRecords));
             recordsToProcess = probeBatch.getRecordCount();
             recordsProcessed = 0;
             // If we received an empty batch do nothing
@@ -274,10 +285,9 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
         }
       }
 
-        int probeIndex = -1;
+      int probeIndex = -1;
       // Check if we need to drain the next row in the probe side
       if (getNextRecord) {
-
         if ( !buildSideIsEmpty ) {
           int hashCode = ( cycleNum == 0 ) ?
             partitions[0].getProbeHashCode(recordsProcessed)
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 9052836..d75463b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -121,6 +121,10 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
     return (leftOutcome == IterOutcome.NONE && rightOutcome == 
IterOutcome.NONE);
   }
 
+  public RecordBatchMemoryManager getBatchMemoryManager() {
+    return batchMemoryManager;
+  }
+
   protected void updateBatchMemoryManagerStats() {
     stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
     stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index c147cf7..16b06fe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -20,42 +20,26 @@ package org.apache.drill.exec.record;
 public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
 
-  private int leftRowWidth;
-
-  private int rightRowWidth;
-
-  private RecordBatch leftIncoming;
-
-  private RecordBatch rightIncoming;
+  private int rowWidth[];
+  private RecordBatch recordBatch[];
 
   private static final int numInputs = 2;
-
   public static final int LEFT_INDEX = 0;
-
   public static final int RIGHT_INDEX = 1;
 
   public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, 
RecordBatch rightBatch) {
     super(numInputs, outputBatchSize);
-    this.leftIncoming = leftBatch;
-    this.rightIncoming = rightBatch;
+    recordBatch = new RecordBatch[numInputs];
+    recordBatch[LEFT_INDEX] = leftBatch;
+    recordBatch[RIGHT_INDEX] = rightBatch;
+    rowWidth = new int[numInputs];
   }
 
-  @Override
-  public int update(int inputIndex, int outputPosition) {
-    switch (inputIndex) {
-      case LEFT_INDEX:
-        setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
-        leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        break;
-      case RIGHT_INDEX:
-        setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
-        rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-      default:
-        break;
-    }
-
+  private int updateInternal(int inputIndex, int outputPosition,  boolean 
useAggregate) {
     updateIncomingStats(inputIndex);
-    final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+    rowWidth[inputIndex] = useAggregate ? (int) 
getAvgInputRowWidth(inputIndex) : 
getRecordBatchSizer(inputIndex).getRowAllocSize();
+
+    final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + 
rowWidth[RIGHT_INDEX];
 
     // If outgoing row width is 0, just return. This is possible for empty 
batches or
     // when first set of batches come with OK_NEW_SCHEMA and no data.
@@ -85,13 +69,24 @@ public class JoinBatchMemoryManager extends 
RecordBatchMemoryManager {
   }
 
   @Override
-  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
-    RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
-    RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
+  public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+    setRecordBatchSizer(inputIndex, new 
RecordBatchSizer(recordBatch[inputIndex]));
+    return updateInternal(inputIndex, outputPosition, useAggregate);
+  }
 
-    if (leftSizer != null && leftSizer.getColumn(name) != null) {
-      return leftSizer.getColumn(name);
-    }
-    return rightSizer == null ? null : rightSizer.getColumn(name);
+  @Override
+  public int update(int inputIndex, int outputPosition) {
+    return update(inputIndex, outputPosition, false);
+  }
+
+  @Override
+  public int update(RecordBatch batch, int inputIndex, int outputPosition, 
boolean useAggregate) {
+    setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
+    return updateInternal(inputIndex, outputPosition, useAggregate);
+  }
+
+  @Override
+  public int update(RecordBatch batch, int inputIndex, int outputPosition) {
+    return update(batch, inputIndex, outputPosition, false);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 759e597..993f3cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -163,6 +163,19 @@ public class RecordBatchMemoryManager {
     updateIncomingStats(index);
   }
 
+  public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+    // by default just return the outputRowCount
+    return getOutputRowCount();
+  }
+
+  public int update(RecordBatch batch, int inputIndex, int outputPosition) {
+    return getOutputRowCount();
+  }
+
+  public int update(RecordBatch batch, int inputIndex, int outputPosition, 
boolean useAggregate) {
+    return getOutputRowCount();
+  }
+
   public int getOutputRowCount() {
     return outputRowCount;
   }
@@ -205,8 +218,7 @@ public class RecordBatchMemoryManager {
   }
 
   public void setRecordBatchSizer(RecordBatchSizer sizer) {
-    this.sizer[DEFAULT_INPUT_INDEX] = sizer;
-    inputBatchStats[DEFAULT_INPUT_INDEX] = new BatchStats();
+    setRecordBatchSizer(DEFAULT_INPUT_INDEX, sizer);
   }
 
   public RecordBatchSizer getRecordBatchSizer(int index) {
@@ -261,7 +273,6 @@ public class RecordBatchMemoryManager {
     return UInt4Vector.VALUE_WIDTH;
   }
 
-
   public void allocateVectors(VectorContainer container, int recordCount) {
     // Allocate memory for the vectors.
     // This will iteratively allocate memory for all nested columns underneath.
@@ -269,10 +280,7 @@ public class RecordBatchMemoryManager {
       RecordBatchSizer.ColumnSize colSize = 
getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), recordCount);
     }
-  }
-
-  public void allocateVectors(VectorContainer container) {
-    allocateVectors(container, outputRowCount);
+    container.setRecordCount(0);
   }
 
   public void allocateVectors(List<ValueVector> valueVectors, int recordCount) 
{
@@ -284,6 +292,10 @@ public class RecordBatchMemoryManager {
     }
   }
 
+  public void allocateVectors(VectorContainer container) {
+    allocateVectors(container, outputRowCount);
+  }
+
   public void allocateVectors(List<ValueVector> valueVectors) {
     allocateVectors(valueVectors, outputRowCount);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 7e531f8..a5cb05b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -251,12 +251,50 @@ public class RecordBatchSizer {
     }
 
     /**
-     * This returns actual entry size if rowCount > 0 or standard size 
otherwise.
+     * This returns actual entry size if rowCount > 0 or allocation size 
otherwise.
      * Use this for the cases when you might get empty batches with schema
      * and you still need to do memory calculations based on just schema.
      */
     public int getAllocSizePerEntry() {
-      return rowCount() == 0 ? getStdNetSizePerEntry() : getNetSizePerEntry();
+      if (rowCount() != 0) {
+        return getNetSizePerEntry();
+      }
+
+      int stdNetSize;
+      try {
+        stdNetSize = TypeHelper.getSize(metadata.getType());
+
+        // TypeHelper estimates 50 bytes for variable length. That is pretty 
high number
+        // to use as approximation for empty batches. Use 8 instead.
+        switch (metadata.getType().getMinorType()) {
+          case VARBINARY:
+          case VARCHAR:
+          case VAR16CHAR:
+          case VARDECIMAL:
+            stdNetSize = 4 + 8;
+            break;
+        }
+      } catch (Exception e) {
+        stdNetSize = 0;
+      }
+
+      if (isOptional) {
+        stdNetSize += BIT_VECTOR_WIDTH;
+      }
+
+      if (isRepeated) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + 
OFFSET_VECTOR_WIDTH;
+      }
+
+      for (ColumnSize columnSize : children.values()) {
+        stdNetSize += columnSize.getAllocSizePerEntry();
+      }
+
+      if (isRepeatedList()) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + 
OFFSET_VECTOR_WIDTH;
+      }
+
+      return stdNetSize;
     }
 
     /**
@@ -777,6 +815,13 @@ public class RecordBatchSizer {
     return (int) Math.ceil((double) num / denom);
   }
 
+  public static int safeDivide(int num, double denom) {
+    if (denom == 0) {
+      return 0;
+    }
+    return (int) Math.ceil((double) num / denom);
+  }
+
   public int rowCount() { return rowCount; }
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index 30c0c73..2a44edb 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -39,8 +39,6 @@ public class TestBuildSidePartitioningImpl {
 
     calc.initialize(true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       200,
       2,
@@ -52,7 +50,7 @@ public class TestBuildSidePartitioningImpl {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -84,8 +82,6 @@ public class TestBuildSidePartitioningImpl {
 
     calc.initialize(false,
       true,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       350,
       2,
@@ -97,7 +93,7 @@ public class TestBuildSidePartitioningImpl {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -130,8 +126,6 @@ public class TestBuildSidePartitioningImpl {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       200,
       4,
@@ -143,7 +137,7 @@ public class TestBuildSidePartitioningImpl {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -178,8 +172,6 @@ public class TestBuildSidePartitioningImpl {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       180,
       2,
@@ -191,7 +183,7 @@ public class TestBuildSidePartitioningImpl {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -229,8 +221,6 @@ public class TestBuildSidePartitioningImpl {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       210,
       2,
@@ -242,7 +232,7 @@ public class TestBuildSidePartitioningImpl {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 9838670..da83b00 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -1353,6 +1354,391 @@ public class TestOutputBatchSize extends 
PhysicalOpUnitTestBase {
   }
 
   @Test
+  public void testHashJoinMultipleOutputBatches() throws Exception {
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinSingleOutputBatch() throws Exception {
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(1)  // verify number of batches
+      .expectedBatchSize(totalSize) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 100000;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "c1", "a2", "c2")
+      .expectedNumBatches(2)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, i, 6l, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+
+    // set very low value of output batch size so we can do only one row per 
batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(10)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testRightOuterHashJoin() throws Exception {
+
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testLeftOuterHashJoin() throws Exception {
+
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();
     StringBuilder batchString = new StringBuilder();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to