This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 2882b89  DRILL-6719: Separate spilling queue logic from HashJoin and 
HashAgg.
2882b89 is described below

commit 2882b89c2288a9df7c9abd5321cb85c588312b8c
Author: Timothy Farkas <[email protected]>
AuthorDate: Tue Aug 28 16:28:49 2018 -0700

    DRILL-6719: Separate spilling queue logic from HashJoin and HashAgg.
---
 .../physical/impl/aggregate/HashAggTemplate.java   | 210 ++++++++++----------
 .../common/AbstractSpilledPartitionMetadata.java   |  47 +++++
 .../impl/common/SpilledPartitionMetadata.java      |  42 ++++
 .../exec/physical/impl/common/SpilledState.java    | 216 +++++++++++++++++++++
 .../exec/physical/impl/join/HashJoinBatch.java     | 180 ++++++++++-------
 .../exec/physical/impl/join/HashJoinProbe.java     |   2 +-
 .../physical/impl/join/HashJoinProbeTemplate.java  |   9 +-
 7 files changed, 516 insertions(+), 190 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index f6dd3da..6709cf6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -48,12 +48,14 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
+import 
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
 
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.record.RecordBatchSizer;
 
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -80,6 +82,7 @@ import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
 import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
@@ -95,9 +98,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private static final boolean EXTRA_DEBUG_SPILL = false;
 
   // Fields needed for partitioning (the groups into partitions)
-  private int numPartitions = 0; // must be 2 to the power of bitsInMask (set 
in setup())
-  private int partitionMask; // numPartitions - 1
-  private int bitsInMask; // number of bits in the MASK
   private int nextPartitionToReturn = 0; // which partition to return the next 
batch from
   // The following members are used for logging, metrics, etc.
   private int rowsInPartition = 0; // counts #rows in each partition
@@ -148,25 +148,15 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private int outBatchIndex[];
 
   // For handling spilling
+  private HashAggUpdater updater = new HashAggUpdater();
+  private SpilledState<HashAggSpilledPartition> spilledState = new 
SpilledState<>();
   private SpillSet spillSet;
   SpilledRecordbatch newIncoming; // when reading a spilled file - work like 
an "incoming"
   private Writer writers[]; // a vector writer for each spilled partition
   private int spilledBatchesCount[]; // count number of batches spilled, in 
each partition
   private String spillFiles[];
-  private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
 
-  private static class SpilledPartition {
-    public int spilledBatches;
-    public String spillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
-  }
-
-  private ArrayList<SpilledPartition> spilledPartitionsList;
-  private int operatorId; // for the spill file name
-
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal 
index returned by put()
   private int numGroupByOutFields = 0; // Note: this should be <= number of 
group-by fields
   private TypedFieldId[] groupByOutFieldIds;
@@ -180,6 +170,59 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
 
+  public static class HashAggSpilledPartition extends 
AbstractSpilledPartitionMetadata {
+    private final int spilledBatches;
+    private final String spillFile;
+
+    public HashAggSpilledPartition(final int cycle,
+                                   final int originPartition,
+                                   final int prevOriginPartition,
+                                   final int spilledBatches,
+                                   final String spillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.spilledBatches = spilledBatches;
+      this.spillFile = Preconditions.checkNotNull(spillFile);
+    }
+
+    public int getSpilledBatches() {
+      return spilledBatches;
+    }
+
+    public String getSpillFile() {
+      return spillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from 
cycle %d.",
+        this.getOriginPartition(), this.getPrevOriginPartition(), 
this.getCycle());
+    }
+  }
+
+  public class HashAggUpdater implements SpilledState.Updater {
+
+    @Override
+    public void cleanup() {
+      this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return null;
+    }
+
+    @Override
+    public long getMemLimit() {
+      return allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return false;
+    }
+  }
+
   public enum Metric implements MetricDef {
 
     NUM_BUCKETS,
@@ -335,7 +378,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.outContainer = outContainer;
-    this.operatorId = hashAggrConfig.getOperatorId();
     this.useMemoryPrediction = 
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
 
     is2ndPhase = hashAggrConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
@@ -404,7 +446,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
     // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
-    numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+    int numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
     if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early 
return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
@@ -460,9 +502,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       // (but 1st phase can still spill, so it will maintain the original 
memory limit)
       allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
     }
-    // Based on the number of partitions: Set the mask and bit count
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    spilledState.initialize(numPartitions);
 
     // Create arrays (one entry per partition)
     htables = new HashTable[numPartitions];
@@ -471,7 +512,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     writers = new Writer[numPartitions];
     spilledBatchesCount = new int[numPartitions];
     spillFiles = new String[numPartitions];
-    spilledPartitionsList = new ArrayList<SpilledPartition>();
 
     plannedBatches = numPartitions; // each partition should allocate its 
first batch
 
@@ -511,7 +551,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.incoming = newIncoming;
     currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in 
this spill file
     nextPartitionToReturn = 0;
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       htables[i].updateIncoming(newIncoming.getContainer(), null);
       htables[i].reset();
       if ( batchHolders[i] != null) {
@@ -829,7 +869,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
   @Override
   public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int 
newRowWidth) {
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       if (batchHolders[i] == null || batchHolders[i].size() == 0) {
         continue;
       }
@@ -851,7 +891,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
     // clean (and deallocate) each partition
-    for ( int i = 0; i < numPartitions; i++) {
+    for ( int i = 0; i < spilledState.getNumPartitions(); i++) {
           if (htables[i] != null) {
               htables[i].clear();
               htables[i] = null;
@@ -877,12 +917,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
           }
     }
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         try {
-          spillSet.delete(sp.spillFile);
+          spillSet.delete(sp.getSpillFile());
         } catch(IOException e) {
-          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+          logger.warn("Cleanup: Failed to delete spill file 
{}",sp.getSpillFile());
         }
     }
     // Delete the currently handled (if any) spilled file
@@ -948,7 +988,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     // first find the largest spilled partition
     int maxSizeSpilled = -1;
     int indexMaxSpilled = -1;
-    for (int isp = 0; isp < numPartitions; isp++ ) {
+    for (int isp = 0; isp < spilledState.getNumPartitions(); isp++ ) {
       if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
         maxSizeSpilled = batchHolders[isp].size();
         indexMaxSpilled = isp;
@@ -967,7 +1007,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       indexMax = indexMaxSpilled;
       maxSize = 4 * maxSizeSpilled;
     }
-    for ( int insp = 0; insp < numPartitions; insp++) {
+    for ( int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
       if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
         indexMax = insp;
         maxSize = batchHolders[insp].size();
@@ -993,7 +1033,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     ArrayList<BatchHolder> currPartition = batchHolders[part];
     rowsInPartition = 0;
     if ( EXTRA_DEBUG_SPILL ) {
-      logger.debug("HashAggregate: Spilling partition {} current cycle {} part 
size {}", part, cycleNum, currPartition.size());
+      logger.debug("HashAggregate: Spilling partition {} current cycle {} part 
size {}", part, spilledState.getCycle(), currPartition.size());
     }
 
     if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to 
spill
@@ -1001,7 +1041,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     // If this is the first spill for this partition, create an output stream
     if ( ! isSpilled(part) ) {
 
-      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? 
Integer.toString(cycleNum) : null);
+      spillFiles[part] = spillSet.getNextSpillFile(spilledState.getCycle() > 0 
? Integer.toString(spilledState.getCycle()) : null);
 
       try {
         writers[part] = spillSet.writer(spillFiles[part]);
@@ -1025,21 +1065,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, 
numOutputRecords);
 
       // set the value count for outgoing batch value vectors
-      /* int i = 0; */
       for (VectorWrapper<?> v : outgoing) {
         v.getValueVector().getMutator().setValueCount(numOutputRecords);
-        /*
-        // print out the first row to be spilled ( varchar, varchar, bigint )
-        try {
-          if (i++ < 2) {
-            NullableVarCharVector vv = ((NullableVarCharVector) 
v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          } else {
-            NullableBigIntVector vv = ((NullableBigIntVector) 
v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          }
-        } catch (Exception e) { logger.info("While printing the first row - 
Got an exception = {}",e); }
-        */
       }
 
       outContainer.setRecordCount(numOutputRecords);
@@ -1119,19 +1146,20 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     if ( ! earlyOutput ) {
       // Update the next partition to return (if needed)
       // skip fully returned (or spilled) partitions
-      while (nextPartitionToReturn < numPartitions) {
+      while (nextPartitionToReturn < spilledState.getNumPartitions()) {
         //
         // If this partition was spilled - spill the rest of it and skip it
         //
         if ( isSpilled(nextPartitionToReturn) ) {
           spillAPartition(nextPartitionToReturn); // spill the rest
-          SpilledPartition sp = new SpilledPartition();
-          sp.spillFile = spillFiles[nextPartitionToReturn];
-          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
-          sp.cycleNum = cycleNum; // remember the current cycle
-          sp.origPartn = nextPartitionToReturn; // for debugging / filename
-          sp.prevOrigPartn = originalPartition; // for debugging / filename
-          spilledPartitionsList.add(sp);
+          HashAggSpilledPartition sp = new HashAggSpilledPartition(
+            spilledState.getCycle(),
+            nextPartitionToReturn,
+            originalPartition,
+            spilledBatchesCount[nextPartitionToReturn],
+            spillFiles[nextPartitionToReturn]);
+
+          spilledState.addPartition(sp);
 
           reinitPartition(nextPartitionToReturn); // free the memory
           try {
@@ -1155,9 +1183,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       }
 
       // if passed the last partition - either done or need to restart and 
read spilled partitions
-      if (nextPartitionToReturn >= numPartitions) {
+      if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
         // The following "if" is probably never used; due to a similar check 
at the end of this method
-        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+        if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
           if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
@@ -1170,10 +1198,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         buildComplete = false; // go back and call doWork() again
         handlingSpills = true; // beginning to work on the spill files
         // pick a spilled partition; set a new incoming ...
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         // Create a new "incoming" out of the spilled partition spill file
-        newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, 
context, schema, oContext, spillSet);
-        originalPartition = sp.origPartn; // used for the filename
+        newIncoming = new SpilledRecordbatch(sp.getSpillFile(), 
sp.getSpilledBatches(), context, schema, oContext, spillSet);
+        originalPartition = sp.getOriginPartition(); // used for the filename
         logger.trace("Reading back spilled original partition {} as an 
incoming",originalPartition);
         // Initialize .... new incoming, new set of partitions
         try {
@@ -1181,22 +1209,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
-        // update the cycle num if needed
-        // The current cycle num should always be one larger than in the 
spilled partition
-        if ( cycleNum == sp.cycleNum ) {
-          cycleNum = 1 + sp.cycleNum;
-          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-          // report first spill or memory stressful situations
-          if ( cycleNum == 1 ) { logger.info("Started reading spilled records 
"); }
-          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
-          if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
-          if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
-          if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
-        }
-        if ( EXTRA_DEBUG_SPILL ) {
-          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {} batches). More {} spilled partitions left.",
-              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, 
spilledPartitionsList.size());
-        }
+
+        spilledState.updateCycle(stats, sp, updater);
         return AggIterOutcome.AGG_RESTART;
       }
 
@@ -1265,7 +1279,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
         return AggIterOutcome.AGG_EMIT;
       }
-      else if ( (partitionToReturn + 1 == numPartitions) && 
spilledPartitionsList.isEmpty() ) { // last partition ?
+      else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) && 
spilledState.isEmpty()) { // last partition ?
 
         allFlushed = true; // next next() call will return NONE
 
@@ -1313,7 +1327,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     } else if (!canSpill) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate 
spilling.";
     } else { // a bug ?
-      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + numPartitions +
+      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + 
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
       if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + 
plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + 
rowsSpilled; }
@@ -1334,32 +1348,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     assert incomingRowIdx >= 0;
     assert ! earlyOutput;
 
-    /** for debugging
-     Object tmp = (incoming).getValueAccessorById(0, 
BigIntVector.class).getValueVector();
-     BigIntVector vv0 = null;
-     BigIntHolder holder = null;
-
-     if (tmp != null) {
-     vv0 = ((BigIntVector) tmp);
-     holder = new BigIntHolder();
-     holder.value = vv0.getAccessor().get(incomingRowIdx) ;
-     }
-     */
-    /*
-    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
-      // for debugging -- show the first row from a spilled batch
-      Object tmp0 = 
(incoming).getValueAccessorById(NullableVarCharVector.class, 
0).getValueVector();
-      Object tmp1 = 
(incoming).getValueAccessorById(NullableVarCharVector.class, 
1).getValueVector();
-      Object tmp2 = 
(incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
-
-      if (tmp0 != null && tmp1 != null && tmp2 != null) {
-        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
-        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
-        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
-        logger.debug("The first row = {} , {} , {}", 
vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), 
vv2.getAccessor().get(incomingRowIdx));
-      }
-    }
-    */
     // The hash code is computed once, then its lower bits are used to 
determine the
     // partition to use, and the higher bits determine the location in the 
hash table.
     int hashCode;
@@ -1371,12 +1359,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
 
     // right shift hash code for secondary (or tertiary...) spilling
-    for (int i = 0; i < cycleNum; i++) {
-      hashCode >>>= bitsInMask;
+    for (int i = 0; i < spilledState.getCycle(); i++) {
+      hashCode >>>= spilledState.getBitsInMask();
     }
 
-    int currentPartition = hashCode & partitionMask;
-    hashCode >>>= bitsInMask;
+    int currentPartition = hashCode & spilledState.getPartitionMask();
+    hashCode >>>= spilledState.getBitsInMask();
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
 
@@ -1498,7 +1486,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * 
(estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
       int maxSize = 1;
-      for (int insp = 0; insp < numPartitions; insp++) {
+      for (int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
         maxSize = Math.max(maxSize, batchHolders[insp].size());
       }
       maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 - 
double, 2 - max when %50 full, 4 - Uint4
@@ -1577,12 +1565,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
    * @param htables
    */
   private void updateStats(HashTable[] htables) {
-    if ( cycleNum > 0 ||  // These stats are only for before processing 
spilled files
+    if (!spilledState.isFirstCycle() ||  // These stats are only for before 
processing spilled files
       handleEmit ) { return; } // and no stats collecting when handling an EMIT
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
-    for (int ind = 0; ind < numPartitions; ind++) {
+    for (int ind = 0; ind < spilledState.getNumPartitions(); ind++) {
       htables[ind].getStats(newStats);
       htStats.addStats(newStats);
       if (isSpilled(ind)) {
@@ -1593,8 +1581,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no 
spill
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, 
spilledState.getNumPartitions());
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
     if ( is2ndPhase ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
new file mode 100644
index 0000000..951fa66
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+public abstract class AbstractSpilledPartitionMetadata implements 
SpilledPartitionMetadata {
+  private final int cycle;
+  private final int originPartition;
+  private final int prevOriginPartition;
+
+  public AbstractSpilledPartitionMetadata(final int cycle,
+                                          final int originPartition,
+                                          final int prevOriginPartition) {
+    this.cycle = cycle;
+    this.originPartition = originPartition;
+    this.prevOriginPartition = prevOriginPartition;
+  }
+
+  @Override
+  public int getCycle() {
+    return cycle;
+  }
+
+  @Override
+  public int getOriginPartition() {
+    return originPartition;
+  }
+
+  @Override
+  public int getPrevOriginPartition() {
+    return prevOriginPartition;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
new file mode 100644
index 0000000..4d53be3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+/**
+ * This interface represents the metadata for a spilled partition.
+ */
+public interface SpilledPartitionMetadata {
+  /**
+   * The spill cycle for a partition.
+   * @return The spill cycle for a partition.
+   */
+  int getCycle();
+
+  /**
+   * The parent partition.
+   * @return The parent partition.
+   */
+  int getOriginPartition();
+
+  /**
+   * The parent of parent partition.
+   * @return The parent of parent partition.
+   */
+  int getPrevOriginPartition();
+  String makeDebugString();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
new file mode 100644
index 0000000..a176287
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Manages the spilling information for an operator.
+ * @param <T> The class holding spilled partition metadata.
+ *
+ * <h4>Lifecycle</h4>
+ * <ol>
+ *   <li>Create a {@link SpilledState} instance.</li>
+ *   <li>Call {@link SpilledState#initialize(int)}</li>
+ *   <li>Call {@link SpilledState#addPartition(SpilledPartitionMetadata)} 
(SpilledPartitionMetadata)}, {@link SpilledState#getNextSpilledPartition()}, or
+ *           {@link SpilledState#updateCycle(OperatorStats, 
SpilledPartitionMetadata, Updater)}</li>
+ * </ol>
+ *
+ * <p>
+ *  <ul>
+ *   <li>A user can call {@link SpilledState#getCycle()} at any time.</li>
+ *  </ul>
+ * </p>
+ */
+public class SpilledState<T extends SpilledPartitionMetadata> {
+  public static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SpilledState.class);
+
+  private int numPartitions;
+  private int partitionMask;
+  private int bitsInMask;
+
+  private int cycle = 0;
+  private Queue<T> queue = new LinkedList<>();
+  private boolean initialized = false;
+
+  public SpilledState() {
+  }
+
+  /**
+   * Initializes the number of partitions to use for the spilled state.
+   * @param numPartitions The number of partitions to use for the spilled 
state.
+   */
+  public void initialize(int numPartitions) {
+    Preconditions.checkState(!initialized);
+    Preconditions.checkArgument(numPartitions >= 1); // Numpartitions must be 
positive
+    Preconditions.checkArgument((numPartitions & (numPartitions - 1)) == 0); 
// Make sure it's a power of two
+
+    this.numPartitions = numPartitions;
+    initialized = true;
+    partitionMask = numPartitions - 1;
+    bitsInMask = Integer.bitCount(partitionMask);
+  }
+
+  /**
+   * Gets the number of partitions.
+   * @return The number of partitions.
+   */
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  /**
+   * True if this is the first cycle (0).
+   * @return True if this is the first cycle (0).
+   */
+  public boolean isFirstCycle() {
+    return cycle == 0;
+  }
+
+  public int getPartitionMask() {
+    return partitionMask;
+  }
+
+  public int getBitsInMask() {
+    return bitsInMask;
+  }
+
+  /**
+   * Add the partition metadata to the end of the queue to be processed.
+   * @param spilledPartition The partition metadata to process.
+   * @return
+   */
+  public boolean addPartition(T spilledPartition) {
+    Preconditions.checkState(initialized);
+    return queue.offer(spilledPartition);
+  }
+
+  /**
+   * Get the next spilled partition to process.
+   * @return The metadata for the next spilled partition to process.
+   */
+  public T getNextSpilledPartition() {
+    Preconditions.checkState(initialized);
+    return queue.poll();
+  }
+
+  /**
+   * True if there are no spilled partitions.
+   * @return True if there are no spilled partitions.
+   */
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  /**
+   * Update the current spill cycle.
+   * @param operatorStats Current operator stats.
+   * @param spilledPartition The next spilled partition metadata to process.
+   * @param updater The updater implementation that executes custom logic if a 
spill cycle is incremented.
+   */
+  public void updateCycle(final OperatorStats operatorStats,
+                          final T spilledPartition,
+                          final Updater updater) {
+    Preconditions.checkState(initialized);
+    Preconditions.checkNotNull(operatorStats);
+    Preconditions.checkNotNull(spilledPartition);
+    Preconditions.checkNotNull(updater);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+
+    if (cycle == spilledPartition.getCycle()) {
+      // Update the cycle num if needed.
+      // The current cycle num should always be one larger than in the spilled 
partition.
+
+      cycle = 1 + spilledPartition.getCycle();
+      operatorStats.setLongStat(HashJoinBatch.Metric.SPILL_CYCLE, cycle); // 
update stats
+
+      if (logger.isDebugEnabled()) {
+        // report first spill or memory stressful situations
+        if (cycle == 1) {
+          logger.debug("Started reading spilled records ");
+        } else if (cycle == 2) {
+          logger.debug("SECONDARY SPILLING ");
+        } else if (cycle == 3) {
+          logger.debug("TERTIARY SPILLING ");
+        } else if (cycle == 4) {
+          logger.debug("QUATERNARY SPILLING ");
+        } else if (cycle == 5) {
+          logger.debug("QUINARY SPILLING ");
+        }
+      }
+
+      if (updater.hasPartitionLimit() && cycle * bitsInMask > 20) {
+        queue.offer(spilledPartition); // so cleanup() would delete the curr 
spill files
+        updater.cleanup();
+        throw UserException
+          .unsupportedError()
+          .message("%s.\n On cycle num %d mem available %d num partitions 
%d.", updater.getFailureMessage(), cycle, updater.getMemLimit(), numPartitions)
+          .build(logger);
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+  }
+
+  /**
+   * Gets the current spill cycle.
+   * @return The current spill cycle.
+   */
+  public int getCycle() {
+    return cycle;
+  }
+
+  /**
+   * This is a class that is used to do updates of the spilled state.
+   */
+  public interface Updater {
+    /**
+     * Does any necessary cleanup if we've spilled too much and abort the 
query.
+     */
+    void cleanup();
+
+    /**
+     * Gets the failure message in the event that we spilled to far.
+     * @return The failure message in the event that we spilled to far.
+     */
+    String getFailureMessage();
+
+    /**
+     * Gets the current memory limit.
+     * @return The current memory limit.
+     */
+    long getMemLimit();
+
+    /**
+     * True if there is a limit to the number of times we can partition.
+     * @return True if there is a limit to the number of times we can 
partition.
+     */
+    boolean hasPartitionLimit();
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3d45696..929811c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -23,9 +23,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.drill.common.exceptions.UserException;
@@ -56,12 +55,14 @@ import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import 
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -80,6 +81,7 @@ import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -137,8 +139,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
    * The number of {@link HashPartition}s. This is configured via a system 
option and set in {@link #partitionNumTuning(int, 
HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask = 0; // numPartitions - 1
-  private int bitsInMask = 0; // number of bits in the MASK
 
   /**
    * The master class used to generate {@link HashTable}s.
@@ -182,7 +182,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
   private SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled 
batch
   private int maxBatchesInMemory;
@@ -195,21 +194,85 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
   /**
    * This holds information about the spilled partitions for the build and 
probe side.
    */
-  public static class HJSpilledPartition {
-    public int innerSpilledBatches;
-    public String innerSpillFile;
-    public int outerSpilledBatches;
-    public String outerSpillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
+  public static class HashJoinSpilledPartition extends 
AbstractSpilledPartitionMetadata {
+    private final int innerSpilledBatches;
+    private final String innerSpillFile;
+    private int outerSpilledBatches;
+    private String outerSpillFile;
+    private boolean updatedOuter = false;
+
+    public HashJoinSpilledPartition(final int cycle,
+                                    final int originPartition,
+                                    final int prevOriginPartition,
+                                    final int innerSpilledBatches,
+                                    final String innerSpillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.innerSpilledBatches = innerSpilledBatches;
+      this.innerSpillFile = innerSpillFile;
+    }
+
+    public int getInnerSpilledBatches() {
+      return innerSpilledBatches;
+    }
+
+    public String getInnerSpillFile() {
+      return innerSpillFile;
+    }
+
+    public int getOuterSpilledBatches() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpilledBatches;
+    }
+
+    public String getOuterSpillFile() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpillFile;
+    }
+
+    public void updateOuter(final int outerSpilledBatches, final String 
outerSpillFile) {
+      Preconditions.checkState(!updatedOuter);
+      updatedOuter = true;
+
+      this.outerSpilledBatches = outerSpilledBatches;
+      this.outerSpillFile = outerSpillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from 
cycle %d (with %d-%d batches).",
+        this.getOriginPartition(), this.getPrevOriginPartition(), 
this.getCycle(), outerSpilledBatches, innerSpilledBatches);
+    }
+  }
+
+  public class HashJoinUpdater implements SpilledState.Updater {
+    @Override
+    public void cleanup() {
+      HashJoinBatch.this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return "Hash-Join can not partition the inner data any further (probably 
due to too many join-key duplicates).";
+    }
+
+    @Override
+    public long getMemLimit() {
+      return HashJoinBatch.this.allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return true;
+    }
   }
 
   /**
    * Queue of spilled partitions to process.
    */
-  private ArrayList<HJSpilledPartition> spilledPartitionsList = new 
ArrayList<>();
-  private HJSpilledPartition spilledInners[]; // for the outer to find the 
partition
+  private SpilledState<HashJoinSpilledPartition> spilledState = new 
SpilledState<>();
+  private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+  private HashJoinSpilledPartition spilledInners[]; // for the outer to find 
the partition
 
   public enum Metric implements MetricDef {
     NUM_BUCKETS,
@@ -352,7 +415,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       state = BatchState.STOP;
     } else {
       // Got our first batch(es)
-      if (cycleNum == 0) {
+      if (spilledState.isFirstCycle()) {
         // Only collect stats for the first cylce
         memoryManagerUpdate.run();
       }
@@ -470,7 +533,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
               joinType,
               leftUpstream,
               partitions,
-              cycleNum,
+              spilledState.getCycle(),
               container,
               spilledInners,
               buildSideIsEmpty.booleanValue(),
@@ -514,9 +577,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && 
!spilledPartitionsList.isEmpty()) {
+        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
           // Get the next (previously) spilled partition to handle as incoming
-          HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+          HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
 
           // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
           buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
@@ -534,32 +597,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
             hashJoinProbe.changeToFinalProbeState();
           }
 
-          // update the cycle num if needed
-          // The current cycle num should always be one larger than in the 
spilled partition
-          if (cycleNum == currSp.cycleNum) {
-            cycleNum = 1 + currSp.cycleNum;
-            stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-            // report first spill or memory stressful situations
-            if (cycleNum == 1) { logger.info("Started reading spilled records 
"); }
-            if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
-            if (cycleNum == 3) { logger.warn("TERTIARY SPILLING ");  }
-            if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
-            if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
-            if ( cycleNum * bitsInMask > 20 ) {
-              spilledPartitionsList.add(currSp); // so cleanup() would delete 
the curr spill files
-              this.cleanup();
-              throw UserException
-                .unsupportedError()
-                .message("Hash-Join can not partition the inner data any 
further (probably due to too many join-key duplicates)\n"
-                + "On cycle num %d mem available %d num partitions %d", 
cycleNum, allocator.getLimit(), numPartitions)
-                .build(logger);
-            }
-          }
-          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {}-{} batches)." +
-              " More {} spilled partitions left.",
-            currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, 
currSp.outerSpilledBatches,
-            currSp.innerSpilledBatches, spilledPartitionsList.size());
-
+          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
           state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
           prefetchedBuild.setValue(false);
@@ -692,9 +730,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     //  See partitionNumTuning()
     //
 
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-
+    spilledState.initialize(numPartitions);
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
   }
@@ -707,10 +743,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
       partitions[part] = new HashPartition(context, allocator, baseHashTable, 
buildBatch, probeBatch,
-        RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
+        RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), 
numPartitions);
     }
 
-    spilledInners = new HJSpilledPartition[numPartitions];
+    spilledInners = new HashJoinSpilledPartition[numPartitions];
 
   }
 
@@ -836,19 +872,18 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     }
 
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
-    boolean firstCycle = cycleNum == 0;
 
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT: 
RECORDS_PER_BATCH;
+      int maxBatchSize = spilledState.isFirstCycle()? 
RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
       boolean doMemoryCalculation = canSpill && 
!probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash 
values bug fixed
+      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix 
after growing hash values bug fixed
         buildBatch,
         probeBatch,
         buildJoinColumns,
@@ -862,13 +897,13 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
-      if (firstCycle && doMemoryCalculation) {
+      if (spilledState.isFirstCycle() && doMemoryCalculation) {
         // Do auto tuning
         buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
       }
     }
 
-    if (firstCycle) {
+    if (spilledState.isFirstCycle()) {
       // Do initial setup only on the first cycle
       delayedSetup();
     }
@@ -906,11 +941,11 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         }
         final int currentRecordCount = buildBatch.getRecordCount();
 
-        if ( cycleNum > 0 ) {
+        if (!spilledState.isFirstCycle()) {
           read_right_HV_vector = (IntVector) 
buildBatch.getContainer().getLast();
         }
         //create runtime filter
-        if (cycleNum == 0 && enableRuntimeFilter) {
+        if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
           int condFieldIndex = 0;
           for (BloomFilter bloomFilter : bloomFilters) {
@@ -924,10 +959,10 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
         // For every record in the build batch, hash the key columns and keep 
the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
-          int hashCode = ( cycleNum == 0 ) ? 
partitions[0].getBuildHashCode(ind)
+          int hashCode = spilledState.isFirstCycle() ? 
partitions[0].getBuildHashCode(ind)
             : read_right_HV_vector.getAccessor().get(ind); // get the hash 
value from the HV column
-          int currPart = hashCode & partitionMask;
-          hashCode >>>= bitsInMask;
+          int currPart = hashCode & spilledState.getPartitionMask();
+          hashCode >>>= spilledState.getBitsInMask();
           // Append the new inner row to the appropriate partition; spill 
(that partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, 
hashCode, buildCalc); // may spill if needed
         }
@@ -942,7 +977,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
     }
 
-    if (cycleNum == 0 && enableRuntimeFilter) {
+    if (spilledState.isFirstCycle() && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
         runtimeFilterReporter.sendOut(bloomFilters, probeFields, 
this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
@@ -1004,14 +1039,13 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
     for (HashPartition partn : partitions) {
       if ( partn.isSpilled() ) {
-        HJSpilledPartition sp = new HJSpilledPartition();
-        sp.innerSpillFile = partn.getSpillFile();
-        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
-        sp.cycleNum = cycleNum; // remember the current cycle
-        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
-        sp.prevOrigPartn = originalPartition; // for debugging / filename
-        spilledPartitionsList.add(sp);
+        HashJoinSpilledPartition sp = new 
HashJoinSpilledPartition(spilledState.getCycle(),
+          partn.getPartitionNum(),
+          originalPartition,
+          partn.getPartitionBatchesCount(),
+          partn.getSpillFile());
 
+        spilledState.addPartition(sp);
         spilledInners[partn.getPartitionNum()] = sp; // for the outer to find 
the SP later
         partn.closeWriter();
 
@@ -1169,8 +1203,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     }
 
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-      HJSpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+      HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
       try {
         spillSet.delete(sp.innerSpillFile);
       } catch(IOException e) {
@@ -1210,7 +1244,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
    */
   private void updateStats() {
     if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the 
right side is empty
-    if ( cycleNum > 0 ) { return; } // These stats are only for before 
processing spilled files
+    if (!spilledState.isFirstCycle()) { return; } // These stats are only for 
before processing spilled files
 
     final HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
@@ -1227,7 +1261,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no 
spill
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
     this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
@@ -1257,7 +1291,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   @Override
   public void close() {
-    if ( cycleNum > 0 ) { // spilling happened
+    if (!spilledState.isFirstCycle()) { // spilling happened
       // In case closing due to cancellation, BaseRootExec.close() does not 
close the open
       // SpilledRecordBatch "scanners" as it only knows about the original 
left/right ops.
       killIncoming(false);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 5059b18..beddfa6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,7 @@ public interface HashJoinProbe {
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition);
+  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean 
buildSideIsEmpty, int numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
   void setTargetOutputCount(int targetOutputCount);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 71abeda..a63f63d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -77,7 +77,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
   private int currRightPartition = 0; // for returning RIGHT/FULL
   IntVector read_left_HV_vector; // HV vector that was read from the spilled 
batch
   private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
-  private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer 
to find the partition
+  private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the 
outer to find the partition
   private boolean buildSideIsEmpty = true;
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
   private int partitionMask = 0; // numPartitions - 1
@@ -110,7 +110,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
    * @param rightHVColPosition
    */
   @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch 
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] 
partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition) {
+  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch 
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] 
partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean 
buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
     this.container = container;
     this.spilledInners = spilledInners;
     this.probeBatch = probeBatch;
@@ -253,9 +253,8 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
               if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
               partn.completeAnOuterBatch(false);
               // update the partition's spill record with the outer side
-              HashJoinBatch.HJSpilledPartition sp = 
spilledInners[partn.getPartitionNum()];
-              sp.outerSpillFile = partn.getSpillFile();
-              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+              HashJoinBatch.HashJoinSpilledPartition sp = 
spilledInners[partn.getPartitionNum()];
+              sp.updateOuter(partn.getPartitionBatchesCount(), 
partn.getSpillFile());
 
               partn.closeWriter();
             }

Reply via email to