[ 
https://issues.apache.org/jira/browse/DRILL-6236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498871#comment-16498871
 ] 

ASF GitHub Bot commented on DRILL-6236:
---------------------------------------

parthchandra closed pull request #1227: DRILL-6236: batch sizing for hash join
URL: https://github.com/apache/drill/pull/1227
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index ee7a8a38c1..4267077aef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -57,16 +57,19 @@
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
+
 /**
  *   This class implements the runtime execution for the Hash-Join operator
  *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -95,11 +98,6 @@
    */
   private int RECORDS_PER_BATCH; // internal batches
 
-  /**
-   * The maximum number of records in each outgoing batch.
-   */
-  private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
 
@@ -172,7 +170,8 @@
     public String outerSpillFile;
     int cycleNum;
     int origPartn;
-    int prevOrigPartn; }
+    int prevOrigPartn;
+  }
 
   /**
    * Queue of spilled partitions to process.
@@ -181,7 +180,6 @@
   private HJSpilledPartition spilledInners[]; // for the outer to find the 
partition
 
   public enum Metric implements MetricDef {
-
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
@@ -190,8 +188,19 @@
     SPILLED_PARTITIONS, // number of original partitions spilled to disk
     SPILL_MB,         // Number of MB of data spilled to disk. This amount is 
first written,
                       // then later re-read. So, disk I/O is twice this amount.
-    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
-    ;
+    SPILL_CYCLE,       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    LEFT_INPUT_BATCH_COUNT,
+    LEFT_AVG_INPUT_BATCH_BYTES,
+    LEFT_AVG_INPUT_ROW_BYTES,
+    LEFT_INPUT_RECORD_COUNT,
+    RIGHT_INPUT_BATCH_COUNT,
+    RIGHT_AVG_INPUT_BATCH_BYTES,
+    RIGHT_AVG_INPUT_ROW_BYTES,
+    RIGHT_INPUT_RECORD_COUNT,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    OUTPUT_RECORD_COUNT;
 
     // duplicate for hash ag
 
@@ -221,12 +230,7 @@ protected void buildSchema() throws SchemaChangeException {
       throw new SchemaChangeException(e);
     }
 
-    // Build the container schema and set the counts
-    for (final VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    container.setRecordCount(outputRecords);
   }
 
   @Override
@@ -234,6 +238,15 @@ protected boolean prefetchFirstBatchFromBothSides() {
     leftUpstream = sniffNonEmptyBatch(0, left);
     rightUpstream = sniffNonEmptyBatch(1, right);
 
+    // For build side, use aggregate i.e. average row width across batches
+    batchMemoryManager.update(LEFT_INDEX, 0);
+    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming left:\n {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right:\n {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+    }
+
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
       state = BatchState.STOP;
       return false;
@@ -333,10 +346,21 @@ public IterOutcome innerNext() {
            joinType != JoinRelType.INNER) {  // or if this is a left/full 
outer join
 
         // Allocate the memory for the vectors in the output container
-        allocateVectors();
+        batchMemoryManager.allocateVectors(container);
+        
hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
         outputRecords = hashJoinProbe.probeAndProject();
 
+        for (final VectorWrapper<?> v : container) {
+          v.getValueVector().getMutator().setValueCount(outputRecords);
+        }
+        container.setRecordCount(outputRecords);
+
+        batchMemoryManager.updateOutgoingStats(outputRecords);
+        if (logger.isDebugEnabled()) {
+          logger.debug("BATCH_STATS, outgoing:\n {}", new 
RecordBatchSizer(this));
+        }
+
         /* We are here because of one the following
          * 1. Completed processing of all the records and we are done
          * 2. We've filled up the outgoing batch to the maximum and we need to 
return upstream
@@ -347,10 +371,6 @@ public IterOutcome innerNext() {
             state = BatchState.NOT_FIRST;
           }
 
-          for (final VectorWrapper<?> v : container) {
-            v.getValueVector().getMutator().setValueCount(outputRecords);
-          }
-
           return IterOutcome.OK;
         }
 
@@ -557,7 +577,8 @@ private void initializeBuild() {
         RECORDS_PER_BATCH,
         maxBatchSize,
         maxBatchSize,
-        TARGET_RECORDS_PER_BATCH,
+        batchMemoryManager.getOutputRowCount(),
+        batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
       disableSpilling(null);
@@ -628,7 +649,8 @@ public void executeBuildPhase() throws 
SchemaChangeException {
         RECORDS_PER_BATCH,
         maxBatchSize,
         maxBatchSize,
-        TARGET_RECORDS_PER_BATCH,
+        batchMemoryManager.getOutputRowCount(),
+        batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
       if (firstCycle && doMemoryCalculation) {
@@ -665,6 +687,7 @@ public void executeBuildPhase() throws 
SchemaChangeException {
         for (HashPartition partn : partitions) { partn.updateBatches(); }
         // Fall through
       case OK:
+        batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
         // Special treatment (when no spill, and single partition) -- use the 
incoming vectors as they are (no row copy)
         if ( numPartitions == 1 ) {
           partitions[0].appendBatch(buildBatch);
@@ -803,22 +826,6 @@ private void setupOutputContainerSchema() {
 
   }
 
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vectorWrapper : container) {
-      ValueVector valueVector = vectorWrapper.getValueVector();
-
-      if (valueVector instanceof FixedWidthVector) {
-        ((FixedWidthVector) valueVector).allocateNew(TARGET_RECORDS_PER_BATCH);
-      } else if (valueVector instanceof VariableWidthVector) {
-        ((VariableWidthVector) valueVector).allocateNew(8 * 
TARGET_RECORDS_PER_BATCH, TARGET_RECORDS_PER_BATCH);
-      } else {
-        valueVector.allocateNew();
-      }
-    }
-
-    container.setRecordCount(0); // reset container's counter back to zero 
records
-  }
-
   // (After the inner side was read whole) - Has that inner partition spilled
   public boolean isSpilledInner(int part) {
     if ( spilledInners == null ) { return false; } // empty inner
@@ -879,6 +886,10 @@ public HashJoinBatch(HashJoinPOP popConfig, 
FragmentContext context,
 
     // Create empty partitions (in the ctor - covers the case where right side 
is empty)
     partitions = new HashPartition[0];
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, 
right);
   }
 
   /**
@@ -966,6 +977,23 @@ public void killIncoming(boolean sendUpstream) {
     buildBatch.kill(sendUpstream);
   }
 
+  public void updateMetrics() {
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, 
batchMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, 
batchMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, 
batchMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, 
batchMemoryManager.getTotalOutputRecords());
+  }
+
   @Override
   public void close() {
     if ( cycleNum > 0 ) { // spilling happened
@@ -973,6 +1001,25 @@ public void close() {
       // SpilledRecordBatch "scanners" as it only knows about the original 
left/right ops.
       killIncoming(false);
     }
+
+    updateMetrics();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     this.cleanup();
     super.close();
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index 618e80e13b..fb087a0fd0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -77,6 +77,7 @@ public void initialize(boolean autoTune,
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       this.initialPartitions = initialPartitions;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index 71292a550b..868fbfd10b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -100,6 +100,7 @@ void initialize(boolean autoTune,
                     int maxBatchNumRecordsBuild,
                     int maxBatchNumRecordsProbe,
                     int outputBatchNumRecords,
+                    int outputBatchSize,
                     double loadFactor);
 
     void setPartitionStatSet(PartitionStatSet partitionStatSet);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index ed0adc5d81..37f33295ee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -142,6 +142,7 @@ public void initialize(boolean autoTune,
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       this.initialPartitions = initialPartitions;
     }
@@ -203,7 +204,7 @@ public HashJoinState getState() {
    * <h1>Life Cycle</h1>
    * <p>
    *   <ul>
-   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, double)}.
+   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, 
RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, int, 
double)}.
    *     This will initialize the StateCalculate with the additional 
information it needs.</li>
    *     <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number 
of partitions that fit in memory.</li>
    *     <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if 
spilling needs to occurr.</li>
@@ -233,9 +234,7 @@ public HashJoinState getState() {
     private int partitions;
     private int recordsPerPartitionBatchBuild;
     private int recordsPerPartitionBatchProbe;
-    private int outputBatchNumRecords;
-    private Map<String, Long> buildValueSizes;
-    private Map<String, Long> probeValueSizes;
+    private int outputBatchSize;
     private Map<String, Long> keySizes;
     private boolean autoTune;
     private boolean reserveHash;
@@ -273,6 +272,7 @@ public void initialize(boolean autoTune,
                            int maxBatchNumRecordsBuild,
                            int maxBatchNumRecordsProbe,
                            int outputBatchNumRecords,
+                           int outputBatchSize,
                            double loadFactor) {
       Preconditions.checkNotNull(buildSideBatch);
       Preconditions.checkNotNull(probeSideBatch);
@@ -300,8 +300,6 @@ public void initialize(boolean autoTune,
 
       initialize(autoTune,
         reserveHash,
-        buildValueSizes,
-        probeValueSizes,
         keySizes,
         memoryAvailable,
         initialPartitions,
@@ -313,7 +311,7 @@ public void initialize(boolean autoTune,
         recordsPerPartitionBatchProbe,
         maxBatchNumRecordsBuild,
         maxBatchNumRecordsProbe,
-        outputBatchNumRecords,
+        outputBatchSize,
         loadFactor);
     }
 
@@ -352,8 +350,6 @@ public static long getBatchSizeEstimate(final RecordBatch 
recordBatch) {
     @VisibleForTesting
     protected void initialize(boolean autoTune,
                               boolean reserveHash,
-                              CaseInsensitiveMap<Long> buildValueSizes,
-                              CaseInsensitiveMap<Long> probeValueSizes,
                               CaseInsensitiveMap<Long> keySizes,
                               long memoryAvailable,
                               int initialPartitions,
@@ -365,7 +361,7 @@ protected void initialize(boolean autoTune,
                               int recordsPerPartitionBatchProbe,
                               int maxBatchNumRecordsBuild,
                               int maxBatchNumRecordsProbe,
-                              int outputBatchNumRecords,
+                              int outputBatchSize,
                               double loadFactor) {
       Preconditions.checkState(!firstInitialized);
       Preconditions.checkArgument(initialPartitions >= 1);
@@ -374,8 +370,6 @@ protected void initialize(boolean autoTune,
       this.loadFactor = loadFactor;
       this.autoTune = autoTune;
       this.reserveHash = reserveHash;
-      this.buildValueSizes = Preconditions.checkNotNull(buildValueSizes);
-      this.probeValueSizes = Preconditions.checkNotNull(probeValueSizes);
       this.keySizes = Preconditions.checkNotNull(keySizes);
       this.memoryAvailable = memoryAvailable;
       this.buildBatchSize = buildBatchSize;
@@ -387,7 +381,7 @@ protected void initialize(boolean autoTune,
       this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
       this.maxBatchNumRecordsBuild = maxBatchNumRecordsBuild;
       this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
-      this.outputBatchNumRecords = outputBatchNumRecords;
+      this.outputBatchSize = outputBatchSize;
 
       calculateMemoryUsage();
 
@@ -448,8 +442,7 @@ private void calculateMemoryUsage()
         safetyFactor,
         reserveHash);
 
-      maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, 
probeValueSizes, keySizes,
-        outputBatchNumRecords, safetyFactor, fragmentationFactor);
+      maxOutputBatchSize = (long) ((double)outputBatchSize * 
fragmentationFactor * safetyFactor);
 
       long probeReservedMemory;
 
@@ -519,18 +512,6 @@ private void calculateMemoryUsage()
       }
     }
 
-    public static long computeMaxOutputBatchSize(Map<String, Long> 
buildValueSizes,
-                                                 Map<String, Long> 
probeValueSizes,
-                                                 Map<String, Long> keySizes,
-                                                 int outputNumRecords,
-                                                 double safetyFactor,
-                                                 double fragmentationFactor) {
-      long outputSize = 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(keySizes, 
outputNumRecords, safetyFactor)
-        + 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(buildValueSizes, 
outputNumRecords, safetyFactor)
-        + 
HashTableSizeCalculatorConservativeImpl.computeVectorSizes(probeValueSizes, 
outputNumRecords, safetyFactor);
-      return RecordBatchSizer.multiplyByFactor(outputSize, 
fragmentationFactor);
-    }
-
     @Override
     public boolean shouldSpill() {
       Preconditions.checkState(initialized);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index f212605d5d..5059b18d21 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -42,4 +42,6 @@
   void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
+  void setTargetOutputCount(int targetOutputCount);
+  int getOutputCount();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 75c3073652..46f2fa3690 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -31,6 +31,8 @@
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
 public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
   VectorContainer container; // the outgoing container
@@ -45,8 +47,6 @@
 
   private HashJoinBatch outgoingJoinBatch = null;
 
-  private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
   // Number of records to process on the probe side
   private int recordsToProcess = 0;
 
@@ -83,6 +83,16 @@
   private int partitionMask = 0; // numPartitions - 1
   private int bitsInMask = 0; // number of bits in the MASK
   private int rightHVColPosition;
+  private int targetOutputRecords;
+
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
+  public int getOutputCount() {
+    return outputRecords;
+  }
 
   /**
    *  Setup the Hash Join Probe object
@@ -209,7 +219,7 @@ private int outputOuterRow(VectorContainer 
probeSrcContainer, int probeSrcIndex,
 
 
   private void executeProjectRightPhase(int currBuildPart) {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
+    while (outputRecords < targetOutputRecords && recordsProcessed < 
recordsToProcess) {
       outputRecords =
         outputRow(partitions[currBuildPart].getContainers(), 
unmatchedBuildIndexes.get(recordsProcessed),
           null /* no probeBatch */, 0 /* no probe index */ );
@@ -219,7 +229,7 @@ private void executeProjectRightPhase(int currBuildPart) {
 
   private void executeProbePhase() throws SchemaChangeException {
 
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+    while (outputRecords < targetOutputRecords && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
 
       // Check if we have processed all records in this batch we need to 
invoke next
       if (recordsProcessed == recordsToProcess) {
@@ -262,6 +272,7 @@ private void executeProbePhase() throws 
SchemaChangeException {
                 probeBatch.getSchema());
             }
           case OK:
+            
setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch,
 LEFT_INDEX,outputRecords));
             recordsToProcess = probeBatch.getRecordCount();
             recordsProcessed = 0;
             // If we received an empty batch do nothing
@@ -274,10 +285,9 @@ private void executeProbePhase() throws 
SchemaChangeException {
         }
       }
 
-        int probeIndex = -1;
+      int probeIndex = -1;
       // Check if we need to drain the next row in the probe side
       if (getNextRecord) {
-
         if ( !buildSideIsEmpty ) {
           int hashCode = ( cycleNum == 0 ) ?
             partitions[0].getProbeHashCode(recordsProcessed)
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 90528366fe..d75463b0a7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -121,6 +121,10 @@ protected boolean checkForEarlyFinish(IterOutcome 
leftOutcome, IterOutcome right
     return (leftOutcome == IterOutcome.NONE && rightOutcome == 
IterOutcome.NONE);
   }
 
+  public RecordBatchMemoryManager getBatchMemoryManager() {
+    return batchMemoryManager;
+  }
+
   protected void updateBatchMemoryManagerStats() {
     stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
     stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index c147cf7ccc..16b06fee84 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -20,42 +20,26 @@
 public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
 
-  private int leftRowWidth;
-
-  private int rightRowWidth;
-
-  private RecordBatch leftIncoming;
-
-  private RecordBatch rightIncoming;
+  private int rowWidth[];
+  private RecordBatch recordBatch[];
 
   private static final int numInputs = 2;
-
   public static final int LEFT_INDEX = 0;
-
   public static final int RIGHT_INDEX = 1;
 
   public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, 
RecordBatch rightBatch) {
     super(numInputs, outputBatchSize);
-    this.leftIncoming = leftBatch;
-    this.rightIncoming = rightBatch;
+    recordBatch = new RecordBatch[numInputs];
+    recordBatch[LEFT_INDEX] = leftBatch;
+    recordBatch[RIGHT_INDEX] = rightBatch;
+    rowWidth = new int[numInputs];
   }
 
-  @Override
-  public int update(int inputIndex, int outputPosition) {
-    switch (inputIndex) {
-      case LEFT_INDEX:
-        setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
-        leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        break;
-      case RIGHT_INDEX:
-        setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
-        rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-      default:
-        break;
-    }
-
+  private int updateInternal(int inputIndex, int outputPosition,  boolean 
useAggregate) {
     updateIncomingStats(inputIndex);
-    final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+    rowWidth[inputIndex] = useAggregate ? (int) 
getAvgInputRowWidth(inputIndex) : 
getRecordBatchSizer(inputIndex).getRowAllocSize();
+
+    final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + 
rowWidth[RIGHT_INDEX];
 
     // If outgoing row width is 0, just return. This is possible for empty 
batches or
     // when first set of batches come with OK_NEW_SCHEMA and no data.
@@ -85,13 +69,24 @@ public int update(int inputIndex, int outputPosition) {
   }
 
   @Override
-  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
-    RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
-    RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
+  public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+    setRecordBatchSizer(inputIndex, new 
RecordBatchSizer(recordBatch[inputIndex]));
+    return updateInternal(inputIndex, outputPosition, useAggregate);
+  }
 
-    if (leftSizer != null && leftSizer.getColumn(name) != null) {
-      return leftSizer.getColumn(name);
-    }
-    return rightSizer == null ? null : rightSizer.getColumn(name);
+  @Override
+  public int update(int inputIndex, int outputPosition) {
+    return update(inputIndex, outputPosition, false);
+  }
+
+  @Override
+  public int update(RecordBatch batch, int inputIndex, int outputPosition, 
boolean useAggregate) {
+    setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
+    return updateInternal(inputIndex, outputPosition, useAggregate);
+  }
+
+  @Override
+  public int update(RecordBatch batch, int inputIndex, int outputPosition) {
+    return update(batch, inputIndex, outputPosition, false);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 759e597d8f..993f3cb63d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -163,6 +163,19 @@ public void update(RecordBatch recordBatch, int index) {
     updateIncomingStats(index);
   }
 
+  public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+    // by default just return the outputRowCount
+    return getOutputRowCount();
+  }
+
+  public int update(RecordBatch batch, int inputIndex, int outputPosition) {
+    return getOutputRowCount();
+  }
+
+  public int update(RecordBatch batch, int inputIndex, int outputPosition, 
boolean useAggregate) {
+    return getOutputRowCount();
+  }
+
   public int getOutputRowCount() {
     return outputRowCount;
   }
@@ -205,8 +218,7 @@ public void setRecordBatchSizer(int index, RecordBatchSizer 
sizer) {
   }
 
   public void setRecordBatchSizer(RecordBatchSizer sizer) {
-    this.sizer[DEFAULT_INPUT_INDEX] = sizer;
-    inputBatchStats[DEFAULT_INPUT_INDEX] = new BatchStats();
+    setRecordBatchSizer(DEFAULT_INPUT_INDEX, sizer);
   }
 
   public RecordBatchSizer getRecordBatchSizer(int index) {
@@ -261,7 +273,6 @@ public int getOffsetVectorWidth() {
     return UInt4Vector.VALUE_WIDTH;
   }
 
-
   public void allocateVectors(VectorContainer container, int recordCount) {
     // Allocate memory for the vectors.
     // This will iteratively allocate memory for all nested columns underneath.
@@ -269,10 +280,7 @@ public void allocateVectors(VectorContainer container, int 
recordCount) {
       RecordBatchSizer.ColumnSize colSize = 
getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), recordCount);
     }
-  }
-
-  public void allocateVectors(VectorContainer container) {
-    allocateVectors(container, outputRowCount);
+    container.setRecordCount(0);
   }
 
   public void allocateVectors(List<ValueVector> valueVectors, int recordCount) 
{
@@ -284,6 +292,10 @@ public void allocateVectors(List<ValueVector> 
valueVectors, int recordCount) {
     }
   }
 
+  public void allocateVectors(VectorContainer container) {
+    allocateVectors(container, outputRowCount);
+  }
+
   public void allocateVectors(List<ValueVector> valueVectors) {
     allocateVectors(valueVectors, outputRowCount);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 7e531f8e39..a5cb05b008 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -251,12 +251,50 @@ public int getNetSizePerEntry() {
     }
 
     /**
-     * This returns actual entry size if rowCount > 0 or standard size 
otherwise.
+     * This returns actual entry size if rowCount > 0 or allocation size 
otherwise.
      * Use this for the cases when you might get empty batches with schema
      * and you still need to do memory calculations based on just schema.
      */
     public int getAllocSizePerEntry() {
-      return rowCount() == 0 ? getStdNetSizePerEntry() : getNetSizePerEntry();
+      if (rowCount() != 0) {
+        return getNetSizePerEntry();
+      }
+
+      int stdNetSize;
+      try {
+        stdNetSize = TypeHelper.getSize(metadata.getType());
+
+        // TypeHelper estimates 50 bytes for variable length. That is pretty 
high number
+        // to use as approximation for empty batches. Use 8 instead.
+        switch (metadata.getType().getMinorType()) {
+          case VARBINARY:
+          case VARCHAR:
+          case VAR16CHAR:
+          case VARDECIMAL:
+            stdNetSize = 4 + 8;
+            break;
+        }
+      } catch (Exception e) {
+        stdNetSize = 0;
+      }
+
+      if (isOptional) {
+        stdNetSize += BIT_VECTOR_WIDTH;
+      }
+
+      if (isRepeated) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + 
OFFSET_VECTOR_WIDTH;
+      }
+
+      for (ColumnSize columnSize : children.values()) {
+        stdNetSize += columnSize.getAllocSizePerEntry();
+      }
+
+      if (isRepeatedList()) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + 
OFFSET_VECTOR_WIDTH;
+      }
+
+      return stdNetSize;
     }
 
     /**
@@ -777,6 +815,13 @@ public static int safeDivide(int num, float denom) {
     return (int) Math.ceil((double) num / denom);
   }
 
+  public static int safeDivide(int num, double denom) {
+    if (denom == 0) {
+      return 0;
+    }
+    return (int) Math.ceil((double) num / denom);
+  }
+
   public int rowCount() { return rowCount; }
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index 30c0c73edd..2a44edb179 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -39,8 +39,6 @@ public void testSimpleReserveMemoryCalculationNoHash() {
 
     calc.initialize(true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       200,
       2,
@@ -52,7 +50,7 @@ public void testSimpleReserveMemoryCalculationNoHash() {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -84,8 +82,6 @@ public void testSimpleReserveMemoryCalculationHash() {
 
     calc.initialize(false,
       true,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       350,
       2,
@@ -97,7 +93,7 @@ public void testSimpleReserveMemoryCalculationHash() {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -130,8 +126,6 @@ public void testAdjustInitialPartitions() {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       200,
       4,
@@ -143,7 +137,7 @@ public void testAdjustInitialPartitions() {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
@@ -178,8 +172,6 @@ public void testNoRoomInMemoryForBatch1() {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       180,
       2,
@@ -191,7 +183,7 @@ public void testNoRoomInMemoryForBatch1() {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
@@ -229,8 +221,6 @@ public void testCompleteLifeCycle() {
     calc.initialize(
       true,
       false,
-      buildValueSizes,
-      probeValueSizes,
       keySizes,
       210,
       2,
@@ -242,7 +232,7 @@ public void testCompleteLifeCycle() {
       5,
       maxBatchNumRecords,
       maxBatchNumRecords,
-      10,
+      16000,
       .75);
 
     final PartitionStatImpl partition1 = new PartitionStatImpl();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 98386702ec..da83b00446 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -1352,6 +1353,391 @@ public void testUnionLowerLimit() throws Exception {
     opTestBuilder.go();
   }
 
+  @Test
+  public void testHashJoinMultipleOutputBatches() throws Exception {
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinSingleOutputBatch() throws Exception {
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(1)  // verify number of batches
+      .expectedBatchSize(totalSize) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 100000;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "c1", "a2", "c2")
+      .expectedNumBatches(2)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, i, 6l, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+
+    // set very low value of output batch size so we can do only one row per 
batch.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(10)  // verify number of batches
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testRightOuterHashJoin() throws Exception {
+
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testLeftOuterHashJoin() throws Exception {
+
+    HashJoinPOP hashJoin = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT);
+    mockOpContext(hashJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString 
+ "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + 
"\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString 
+ "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + 
"\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 
1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 
2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 
3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + 
wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + 
wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 
2 accounted for in merge join.
+    
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
 totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashJoin)
+      .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2) // verify batch size
+      .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, 
rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
   @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 247355690d..3725364bec 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -220,13 +220,11 @@ public void copyFrom(int inIndex, int outIndex, BitVector 
from) {
     this.mutator.set(outIndex, from.accessor.get(inIndex));
   }
 
-  public boolean copyFromSafe(int inIndex, int outIndex, BitVector from) {
-    if (outIndex >= this.getValueCapacity()) {
-      decrementAllocationMonitor();
-      return false;
+  public void copyFromSafe(int inIndex, int outIndex, BitVector from) {
+    while (outIndex >= this.getValueCapacity()) {
+      reAlloc();
     }
     copyFrom(inIndex, outIndex, from);
-    return true;
   }
 
   @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> batch sizing for hash join
> --------------------------
>
>                 Key: DRILL-6236
>                 URL: https://issues.apache.org/jira/browse/DRILL-6236
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>    Affects Versions: 1.13.0
>            Reporter: Padma Penumarthy
>            Assignee: Padma Penumarthy
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> limit output batch size for hash join based on memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to