ilooner closed pull request #1465: DRILL-6719: Separate spilling queue logic 
from HashJoin and HashAgg.
URL: https://github.com/apache/drill/pull/1465
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index f6dd3da425e..6709cf6ddef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -48,12 +48,14 @@
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
+import 
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
 
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.record.RecordBatchSizer;
 
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -80,6 +82,7 @@
 import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
 import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
@@ -95,9 +98,6 @@
   private static final boolean EXTRA_DEBUG_SPILL = false;
 
   // Fields needed for partitioning (the groups into partitions)
-  private int numPartitions = 0; // must be 2 to the power of bitsInMask (set 
in setup())
-  private int partitionMask; // numPartitions - 1
-  private int bitsInMask; // number of bits in the MASK
   private int nextPartitionToReturn = 0; // which partition to return the next 
batch from
   // The following members are used for logging, metrics, etc.
   private int rowsInPartition = 0; // counts #rows in each partition
@@ -148,25 +148,15 @@
   private int outBatchIndex[];
 
   // For handling spilling
+  private HashAggUpdater updater = new HashAggUpdater();
+  private SpilledState<HashAggSpilledPartition> spilledState = new 
SpilledState<>();
   private SpillSet spillSet;
   SpilledRecordbatch newIncoming; // when reading a spilled file - work like 
an "incoming"
   private Writer writers[]; // a vector writer for each spilled partition
   private int spilledBatchesCount[]; // count number of batches spilled, in 
each partition
   private String spillFiles[];
-  private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
 
-  private static class SpilledPartition {
-    public int spilledBatches;
-    public String spillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
-  }
-
-  private ArrayList<SpilledPartition> spilledPartitionsList;
-  private int operatorId; // for the spill file name
-
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal 
index returned by put()
   private int numGroupByOutFields = 0; // Note: this should be <= number of 
group-by fields
   private TypedFieldId[] groupByOutFieldIds;
@@ -180,6 +170,59 @@
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
 
+  public static class HashAggSpilledPartition extends 
AbstractSpilledPartitionMetadata {
+    private final int spilledBatches;
+    private final String spillFile;
+
+    public HashAggSpilledPartition(final int cycle,
+                                   final int originPartition,
+                                   final int prevOriginPartition,
+                                   final int spilledBatches,
+                                   final String spillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.spilledBatches = spilledBatches;
+      this.spillFile = Preconditions.checkNotNull(spillFile);
+    }
+
+    public int getSpilledBatches() {
+      return spilledBatches;
+    }
+
+    public String getSpillFile() {
+      return spillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from 
cycle %d.",
+        this.getOriginPartition(), this.getPrevOriginPartition(), 
this.getCycle());
+    }
+  }
+
+  public class HashAggUpdater implements SpilledState.Updater {
+
+    @Override
+    public void cleanup() {
+      this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return null;
+    }
+
+    @Override
+    public long getMemLimit() {
+      return allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return false;
+    }
+  }
+
   public enum Metric implements MetricDef {
 
     NUM_BUCKETS,
@@ -335,7 +378,6 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.outContainer = outContainer;
-    this.operatorId = hashAggrConfig.getOperatorId();
     this.useMemoryPrediction = 
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
 
     is2ndPhase = hashAggrConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
@@ -404,7 +446,7 @@ private void delayedSetup() {
     final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
     // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
-    numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+    int numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
     if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early 
return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
@@ -460,9 +502,8 @@ private void delayedSetup() {
       // (but 1st phase can still spill, so it will maintain the original 
memory limit)
       allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
     }
-    // Based on the number of partitions: Set the mask and bit count
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    spilledState.initialize(numPartitions);
 
     // Create arrays (one entry per partition)
     htables = new HashTable[numPartitions];
@@ -471,7 +512,6 @@ private void delayedSetup() {
     writers = new Writer[numPartitions];
     spilledBatchesCount = new int[numPartitions];
     spillFiles = new String[numPartitions];
-    spilledPartitionsList = new ArrayList<SpilledPartition>();
 
     plannedBatches = numPartitions; // each partition should allocate its 
first batch
 
@@ -511,7 +551,7 @@ private void initializeSetup(RecordBatch newIncoming) 
throws SchemaChangeExcepti
     this.incoming = newIncoming;
     currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in 
this spill file
     nextPartitionToReturn = 0;
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       htables[i].updateIncoming(newIncoming.getContainer(), null);
       htables[i].reset();
       if ( batchHolders[i] != null) {
@@ -829,7 +869,7 @@ public int getOutputCount() {
 
   @Override
   public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int 
newRowWidth) {
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       if (batchHolders[i] == null || batchHolders[i].size() == 0) {
         continue;
       }
@@ -851,7 +891,7 @@ public void cleanup() {
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
     // clean (and deallocate) each partition
-    for ( int i = 0; i < numPartitions; i++) {
+    for ( int i = 0; i < spilledState.getNumPartitions(); i++) {
           if (htables[i] != null) {
               htables[i].clear();
               htables[i] = null;
@@ -877,12 +917,12 @@ public void cleanup() {
           }
     }
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         try {
-          spillSet.delete(sp.spillFile);
+          spillSet.delete(sp.getSpillFile());
         } catch(IOException e) {
-          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+          logger.warn("Cleanup: Failed to delete spill file 
{}",sp.getSpillFile());
         }
     }
     // Delete the currently handled (if any) spilled file
@@ -948,7 +988,7 @@ private int chooseAPartitionToFlush(int currPart, boolean 
tryAvoidCurr) {
     // first find the largest spilled partition
     int maxSizeSpilled = -1;
     int indexMaxSpilled = -1;
-    for (int isp = 0; isp < numPartitions; isp++ ) {
+    for (int isp = 0; isp < spilledState.getNumPartitions(); isp++ ) {
       if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
         maxSizeSpilled = batchHolders[isp].size();
         indexMaxSpilled = isp;
@@ -967,7 +1007,7 @@ private int chooseAPartitionToFlush(int currPart, boolean 
tryAvoidCurr) {
       indexMax = indexMaxSpilled;
       maxSize = 4 * maxSizeSpilled;
     }
-    for ( int insp = 0; insp < numPartitions; insp++) {
+    for ( int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
       if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
         indexMax = insp;
         maxSize = batchHolders[insp].size();
@@ -993,7 +1033,7 @@ private void spillAPartition(int part) {
     ArrayList<BatchHolder> currPartition = batchHolders[part];
     rowsInPartition = 0;
     if ( EXTRA_DEBUG_SPILL ) {
-      logger.debug("HashAggregate: Spilling partition {} current cycle {} part 
size {}", part, cycleNum, currPartition.size());
+      logger.debug("HashAggregate: Spilling partition {} current cycle {} part 
size {}", part, spilledState.getCycle(), currPartition.size());
     }
 
     if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to 
spill
@@ -1001,7 +1041,7 @@ private void spillAPartition(int part) {
     // If this is the first spill for this partition, create an output stream
     if ( ! isSpilled(part) ) {
 
-      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? 
Integer.toString(cycleNum) : null);
+      spillFiles[part] = spillSet.getNextSpillFile(spilledState.getCycle() > 0 
? Integer.toString(spilledState.getCycle()) : null);
 
       try {
         writers[part] = spillSet.writer(spillFiles[part]);
@@ -1025,21 +1065,8 @@ private void spillAPartition(int part) {
       this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, 
numOutputRecords);
 
       // set the value count for outgoing batch value vectors
-      /* int i = 0; */
       for (VectorWrapper<?> v : outgoing) {
         v.getValueVector().getMutator().setValueCount(numOutputRecords);
-        /*
-        // print out the first row to be spilled ( varchar, varchar, bigint )
-        try {
-          if (i++ < 2) {
-            NullableVarCharVector vv = ((NullableVarCharVector) 
v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          } else {
-            NullableBigIntVector vv = ((NullableBigIntVector) 
v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          }
-        } catch (Exception e) { logger.info("While printing the first row - 
Got an exception = {}",e); }
-        */
       }
 
       outContainer.setRecordCount(numOutputRecords);
@@ -1119,19 +1146,20 @@ public AggIterOutcome outputCurrentBatch() {
     if ( ! earlyOutput ) {
       // Update the next partition to return (if needed)
       // skip fully returned (or spilled) partitions
-      while (nextPartitionToReturn < numPartitions) {
+      while (nextPartitionToReturn < spilledState.getNumPartitions()) {
         //
         // If this partition was spilled - spill the rest of it and skip it
         //
         if ( isSpilled(nextPartitionToReturn) ) {
           spillAPartition(nextPartitionToReturn); // spill the rest
-          SpilledPartition sp = new SpilledPartition();
-          sp.spillFile = spillFiles[nextPartitionToReturn];
-          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
-          sp.cycleNum = cycleNum; // remember the current cycle
-          sp.origPartn = nextPartitionToReturn; // for debugging / filename
-          sp.prevOrigPartn = originalPartition; // for debugging / filename
-          spilledPartitionsList.add(sp);
+          HashAggSpilledPartition sp = new HashAggSpilledPartition(
+            spilledState.getCycle(),
+            nextPartitionToReturn,
+            originalPartition,
+            spilledBatchesCount[nextPartitionToReturn],
+            spillFiles[nextPartitionToReturn]);
+
+          spilledState.addPartition(sp);
 
           reinitPartition(nextPartitionToReturn); // free the memory
           try {
@@ -1155,9 +1183,9 @@ public AggIterOutcome outputCurrentBatch() {
       }
 
       // if passed the last partition - either done or need to restart and 
read spilled partitions
-      if (nextPartitionToReturn >= numPartitions) {
+      if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
         // The following "if" is probably never used; due to a similar check 
at the end of this method
-        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+        if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
           if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
@@ -1170,10 +1198,10 @@ public AggIterOutcome outputCurrentBatch() {
         buildComplete = false; // go back and call doWork() again
         handlingSpills = true; // beginning to work on the spill files
         // pick a spilled partition; set a new incoming ...
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         // Create a new "incoming" out of the spilled partition spill file
-        newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, 
context, schema, oContext, spillSet);
-        originalPartition = sp.origPartn; // used for the filename
+        newIncoming = new SpilledRecordbatch(sp.getSpillFile(), 
sp.getSpilledBatches(), context, schema, oContext, spillSet);
+        originalPartition = sp.getOriginPartition(); // used for the filename
         logger.trace("Reading back spilled original partition {} as an 
incoming",originalPartition);
         // Initialize .... new incoming, new set of partitions
         try {
@@ -1181,22 +1209,8 @@ public AggIterOutcome outputCurrentBatch() {
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
-        // update the cycle num if needed
-        // The current cycle num should always be one larger than in the 
spilled partition
-        if ( cycleNum == sp.cycleNum ) {
-          cycleNum = 1 + sp.cycleNum;
-          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-          // report first spill or memory stressful situations
-          if ( cycleNum == 1 ) { logger.info("Started reading spilled records 
"); }
-          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
-          if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
-          if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
-          if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
-        }
-        if ( EXTRA_DEBUG_SPILL ) {
-          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {} batches). More {} spilled partitions left.",
-              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, 
spilledPartitionsList.size());
-        }
+
+        spilledState.updateCycle(stats, sp, updater);
         return AggIterOutcome.AGG_RESTART;
       }
 
@@ -1265,7 +1279,7 @@ else if ( handleEmit ) {
         outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
         return AggIterOutcome.AGG_EMIT;
       }
-      else if ( (partitionToReturn + 1 == numPartitions) && 
spilledPartitionsList.isEmpty() ) { // last partition ?
+      else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) && 
spilledState.isEmpty()) { // last partition ?
 
         allFlushed = true; // next next() call will return NONE
 
@@ -1313,7 +1327,7 @@ private String getOOMErrorMsg(String prefix) {
     } else if (!canSpill) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate 
spilling.";
     } else { // a bug ?
-      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + numPartitions +
+      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + 
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
       if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + 
plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + 
rowsSpilled; }
@@ -1334,32 +1348,6 @@ private void checkGroupAndAggrValues(int incomingRowIdx) 
{
     assert incomingRowIdx >= 0;
     assert ! earlyOutput;
 
-    /** for debugging
-     Object tmp = (incoming).getValueAccessorById(0, 
BigIntVector.class).getValueVector();
-     BigIntVector vv0 = null;
-     BigIntHolder holder = null;
-
-     if (tmp != null) {
-     vv0 = ((BigIntVector) tmp);
-     holder = new BigIntHolder();
-     holder.value = vv0.getAccessor().get(incomingRowIdx) ;
-     }
-     */
-    /*
-    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
-      // for debugging -- show the first row from a spilled batch
-      Object tmp0 = 
(incoming).getValueAccessorById(NullableVarCharVector.class, 
0).getValueVector();
-      Object tmp1 = 
(incoming).getValueAccessorById(NullableVarCharVector.class, 
1).getValueVector();
-      Object tmp2 = 
(incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
-
-      if (tmp0 != null && tmp1 != null && tmp2 != null) {
-        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
-        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
-        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
-        logger.debug("The first row = {} , {} , {}", 
vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), 
vv2.getAccessor().get(incomingRowIdx));
-      }
-    }
-    */
     // The hash code is computed once, then its lower bits are used to 
determine the
     // partition to use, and the higher bits determine the location in the 
hash table.
     int hashCode;
@@ -1371,12 +1359,12 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
     }
 
     // right shift hash code for secondary (or tertiary...) spilling
-    for (int i = 0; i < cycleNum; i++) {
-      hashCode >>>= bitsInMask;
+    for (int i = 0; i < spilledState.getCycle(); i++) {
+      hashCode >>>= spilledState.getBitsInMask();
     }
 
-    int currentPartition = hashCode & partitionMask;
-    hashCode >>>= bitsInMask;
+    int currentPartition = hashCode & spilledState.getPartitionMask();
+    hashCode >>>= spilledState.getBitsInMask();
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
 
@@ -1498,7 +1486,7 @@ private void spillIfNeeded(int currentPartition, boolean 
forceSpill) {
       maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * 
(estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
       int maxSize = 1;
-      for (int insp = 0; insp < numPartitions; insp++) {
+      for (int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
         maxSize = Math.max(maxSize, batchHolders[insp].size());
       }
       maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 - 
double, 2 - max when %50 full, 4 - Uint4
@@ -1577,12 +1565,12 @@ private void spillIfNeeded(int currentPartition, 
boolean forceSpill) {
    * @param htables
    */
   private void updateStats(HashTable[] htables) {
-    if ( cycleNum > 0 ||  // These stats are only for before processing 
spilled files
+    if (!spilledState.isFirstCycle() ||  // These stats are only for before 
processing spilled files
       handleEmit ) { return; } // and no stats collecting when handling an EMIT
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
-    for (int ind = 0; ind < numPartitions; ind++) {
+    for (int ind = 0; ind < spilledState.getNumPartitions(); ind++) {
       htables[ind].getStats(newStats);
       htStats.addStats(newStats);
       if (isSpilled(ind)) {
@@ -1593,8 +1581,8 @@ private void updateStats(HashTable[] htables) {
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no 
spill
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, 
spilledState.getNumPartitions());
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
     if ( is2ndPhase ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
new file mode 100644
index 00000000000..951fa668e46
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+public abstract class AbstractSpilledPartitionMetadata implements 
SpilledPartitionMetadata {
+  private final int cycle;
+  private final int originPartition;
+  private final int prevOriginPartition;
+
+  public AbstractSpilledPartitionMetadata(final int cycle,
+                                          final int originPartition,
+                                          final int prevOriginPartition) {
+    this.cycle = cycle;
+    this.originPartition = originPartition;
+    this.prevOriginPartition = prevOriginPartition;
+  }
+
+  @Override
+  public int getCycle() {
+    return cycle;
+  }
+
+  @Override
+  public int getOriginPartition() {
+    return originPartition;
+  }
+
+  @Override
+  public int getPrevOriginPartition() {
+    return prevOriginPartition;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
new file mode 100644
index 00000000000..4d53be349bc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+/**
+ * This interface represents the metadata for a spilled partition.
+ */
+public interface SpilledPartitionMetadata {
+  /**
+   * The spill cycle for a partition.
+   * @return The spill cycle for a partition.
+   */
+  int getCycle();
+
+  /**
+   * The parent partition.
+   * @return The parent partition.
+   */
+  int getOriginPartition();
+
+  /**
+   * The parent of parent partition.
+   * @return The parent of parent partition.
+   */
+  int getPrevOriginPartition();
+  String makeDebugString();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
new file mode 100644
index 00000000000..a1762877349
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Manages the spilling information for an operator.
+ * @param <T> The class holding spilled partition metadata.
+ *
+ * <h4>Lifecycle</h4>
+ * <ol>
+ *   <li>Create a {@link SpilledState} instance.</li>
+ *   <li>Call {@link SpilledState#initialize(int)}</li>
+ *   <li>Call {@link SpilledState#addPartition(SpilledPartitionMetadata)} 
(SpilledPartitionMetadata)}, {@link SpilledState#getNextSpilledPartition()}, or
+ *           {@link SpilledState#updateCycle(OperatorStats, 
SpilledPartitionMetadata, Updater)}</li>
+ * </ol>
+ *
+ * <p>
+ *  <ul>
+ *   <li>A user can call {@link SpilledState#getCycle()} at any time.</li>
+ *  </ul>
+ * </p>
+ */
+public class SpilledState<T extends SpilledPartitionMetadata> {
+  public static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SpilledState.class);
+
+  private int numPartitions;
+  private int partitionMask;
+  private int bitsInMask;
+
+  private int cycle = 0;
+  private Queue<T> queue = new LinkedList<>();
+  private boolean initialized = false;
+
+  public SpilledState() {
+  }
+
+  /**
+   * Initializes the number of partitions to use for the spilled state.
+   * @param numPartitions The number of partitions to use for the spilled 
state.
+   */
+  public void initialize(int numPartitions) {
+    Preconditions.checkState(!initialized);
+    Preconditions.checkArgument(numPartitions >= 1); // Numpartitions must be 
positive
+    Preconditions.checkArgument((numPartitions & (numPartitions - 1)) == 0); 
// Make sure it's a power of two
+
+    this.numPartitions = numPartitions;
+    initialized = true;
+    partitionMask = numPartitions - 1;
+    bitsInMask = Integer.bitCount(partitionMask);
+  }
+
+  /**
+   * Gets the number of partitions.
+   * @return The number of partitions.
+   */
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  /**
+   * True if this is the first cycle (0).
+   * @return True if this is the first cycle (0).
+   */
+  public boolean isFirstCycle() {
+    return cycle == 0;
+  }
+
+  public int getPartitionMask() {
+    return partitionMask;
+  }
+
+  public int getBitsInMask() {
+    return bitsInMask;
+  }
+
+  /**
+   * Add the partition metadata to the end of the queue to be processed.
+   * @param spilledPartition The partition metadata to process.
+   * @return
+   */
+  public boolean addPartition(T spilledPartition) {
+    Preconditions.checkState(initialized);
+    return queue.offer(spilledPartition);
+  }
+
+  /**
+   * Get the next spilled partition to process.
+   * @return The metadata for the next spilled partition to process.
+   */
+  public T getNextSpilledPartition() {
+    Preconditions.checkState(initialized);
+    return queue.poll();
+  }
+
+  /**
+   * True if there are no spilled partitions.
+   * @return True if there are no spilled partitions.
+   */
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  /**
+   * Update the current spill cycle.
+   * @param operatorStats Current operator stats.
+   * @param spilledPartition The next spilled partition metadata to process.
+   * @param updater The updater implementation that executes custom logic if a 
spill cycle is incremented.
+   */
+  public void updateCycle(final OperatorStats operatorStats,
+                          final T spilledPartition,
+                          final Updater updater) {
+    Preconditions.checkState(initialized);
+    Preconditions.checkNotNull(operatorStats);
+    Preconditions.checkNotNull(spilledPartition);
+    Preconditions.checkNotNull(updater);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+
+    if (cycle == spilledPartition.getCycle()) {
+      // Update the cycle num if needed.
+      // The current cycle num should always be one larger than in the spilled 
partition.
+
+      cycle = 1 + spilledPartition.getCycle();
+      operatorStats.setLongStat(HashJoinBatch.Metric.SPILL_CYCLE, cycle); // 
update stats
+
+      if (logger.isDebugEnabled()) {
+        // report first spill or memory stressful situations
+        if (cycle == 1) {
+          logger.debug("Started reading spilled records ");
+        } else if (cycle == 2) {
+          logger.debug("SECONDARY SPILLING ");
+        } else if (cycle == 3) {
+          logger.debug("TERTIARY SPILLING ");
+        } else if (cycle == 4) {
+          logger.debug("QUATERNARY SPILLING ");
+        } else if (cycle == 5) {
+          logger.debug("QUINARY SPILLING ");
+        }
+      }
+
+      if (updater.hasPartitionLimit() && cycle * bitsInMask > 20) {
+        queue.offer(spilledPartition); // so cleanup() would delete the curr 
spill files
+        updater.cleanup();
+        throw UserException
+          .unsupportedError()
+          .message("%s.\n On cycle num %d mem available %d num partitions 
%d.", updater.getFailureMessage(), cycle, updater.getMemLimit(), numPartitions)
+          .build(logger);
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+  }
+
+  /**
+   * Gets the current spill cycle.
+   * @return The current spill cycle.
+   */
+  public int getCycle() {
+    return cycle;
+  }
+
+  /**
+   * This is a class that is used to do updates of the spilled state.
+   */
+  public interface Updater {
+    /**
+     * Does any necessary cleanup if we've spilled too much and abort the 
query.
+     */
+    void cleanup();
+
+    /**
+     * Gets the failure message in the event that we spilled to far.
+     * @return The failure message in the event that we spilled to far.
+     */
+    String getFailureMessage();
+
+    /**
+     * Gets the current memory limit.
+     * @return The current memory limit.
+     */
+    long getMemLimit();
+
+    /**
+     * True if there is a limit to the number of times we can partition.
+     * @return True if there is a limit to the number of times we can 
partition.
+     */
+    boolean hasPartitionLimit();
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3d456967f08..929811ce508 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -23,9 +23,8 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.drill.common.exceptions.UserException;
@@ -56,12 +55,14 @@
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import 
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -80,6 +81,7 @@
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -137,8 +139,6 @@
    * The number of {@link HashPartition}s. This is configured via a system 
option and set in {@link #partitionNumTuning(int, 
HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask = 0; // numPartitions - 1
-  private int bitsInMask = 0; // number of bits in the MASK
 
   /**
    * The master class used to generate {@link HashTable}s.
@@ -182,7 +182,6 @@
   private SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled 
batch
   private int maxBatchesInMemory;
@@ -195,21 +194,85 @@
   /**
    * This holds information about the spilled partitions for the build and 
probe side.
    */
-  public static class HJSpilledPartition {
-    public int innerSpilledBatches;
-    public String innerSpillFile;
-    public int outerSpilledBatches;
-    public String outerSpillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
+  public static class HashJoinSpilledPartition extends 
AbstractSpilledPartitionMetadata {
+    private final int innerSpilledBatches;
+    private final String innerSpillFile;
+    private int outerSpilledBatches;
+    private String outerSpillFile;
+    private boolean updatedOuter = false;
+
+    public HashJoinSpilledPartition(final int cycle,
+                                    final int originPartition,
+                                    final int prevOriginPartition,
+                                    final int innerSpilledBatches,
+                                    final String innerSpillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.innerSpilledBatches = innerSpilledBatches;
+      this.innerSpillFile = innerSpillFile;
+    }
+
+    public int getInnerSpilledBatches() {
+      return innerSpilledBatches;
+    }
+
+    public String getInnerSpillFile() {
+      return innerSpillFile;
+    }
+
+    public int getOuterSpilledBatches() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpilledBatches;
+    }
+
+    public String getOuterSpillFile() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpillFile;
+    }
+
+    public void updateOuter(final int outerSpilledBatches, final String 
outerSpillFile) {
+      Preconditions.checkState(!updatedOuter);
+      updatedOuter = true;
+
+      this.outerSpilledBatches = outerSpilledBatches;
+      this.outerSpillFile = outerSpillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from 
cycle %d (with %d-%d batches).",
+        this.getOriginPartition(), this.getPrevOriginPartition(), 
this.getCycle(), outerSpilledBatches, innerSpilledBatches);
+    }
+  }
+
+  public class HashJoinUpdater implements SpilledState.Updater {
+    @Override
+    public void cleanup() {
+      HashJoinBatch.this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return "Hash-Join can not partition the inner data any further (probably 
due to too many join-key duplicates).";
+    }
+
+    @Override
+    public long getMemLimit() {
+      return HashJoinBatch.this.allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return true;
+    }
   }
 
   /**
    * Queue of spilled partitions to process.
    */
-  private ArrayList<HJSpilledPartition> spilledPartitionsList = new 
ArrayList<>();
-  private HJSpilledPartition spilledInners[]; // for the outer to find the 
partition
+  private SpilledState<HashJoinSpilledPartition> spilledState = new 
SpilledState<>();
+  private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+  private HashJoinSpilledPartition spilledInners[]; // for the outer to find 
the partition
 
   public enum Metric implements MetricDef {
     NUM_BUCKETS,
@@ -352,7 +415,7 @@ private IterOutcome prefetchFirstBatch(IterOutcome outcome,
       state = BatchState.STOP;
     } else {
       // Got our first batch(es)
-      if (cycleNum == 0) {
+      if (spilledState.isFirstCycle()) {
         // Only collect stats for the first cylce
         memoryManagerUpdate.run();
       }
@@ -470,7 +533,7 @@ public IterOutcome innerNext() {
               joinType,
               leftUpstream,
               partitions,
-              cycleNum,
+              spilledState.getCycle(),
               container,
               spilledInners,
               buildSideIsEmpty.booleanValue(),
@@ -514,9 +577,9 @@ public IterOutcome innerNext() {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && 
!spilledPartitionsList.isEmpty()) {
+        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
           // Get the next (previously) spilled partition to handle as incoming
-          HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+          HashJoinSpilledPartition currSp = 
spilledState.getNextSpilledPartition();
 
           // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
           buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
@@ -534,32 +597,7 @@ public IterOutcome innerNext() {
             hashJoinProbe.changeToFinalProbeState();
           }
 
-          // update the cycle num if needed
-          // The current cycle num should always be one larger than in the 
spilled partition
-          if (cycleNum == currSp.cycleNum) {
-            cycleNum = 1 + currSp.cycleNum;
-            stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-            // report first spill or memory stressful situations
-            if (cycleNum == 1) { logger.info("Started reading spilled records 
"); }
-            if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
-            if (cycleNum == 3) { logger.warn("TERTIARY SPILLING ");  }
-            if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
-            if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
-            if ( cycleNum * bitsInMask > 20 ) {
-              spilledPartitionsList.add(currSp); // so cleanup() would delete 
the curr spill files
-              this.cleanup();
-              throw UserException
-                .unsupportedError()
-                .message("Hash-Join can not partition the inner data any 
further (probably due to too many join-key duplicates)\n"
-                + "On cycle num %d mem available %d num partitions %d", 
cycleNum, allocator.getLimit(), numPartitions)
-                .build(logger);
-            }
-          }
-          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {}-{} batches)." +
-              " More {} spilled partitions left.",
-            currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, 
currSp.outerSpilledBatches,
-            currSp.innerSpilledBatches, spilledPartitionsList.size());
-
+          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
           state = BatchState.FIRST;  // TODO need to determine if this is 
still necessary since prefetchFirstBatchFromBothSides sets this
 
           prefetchedBuild.setValue(false);
@@ -692,9 +730,7 @@ private void delayedSetup() {
     //  See partitionNumTuning()
     //
 
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-
+    spilledState.initialize(numPartitions);
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
   }
@@ -707,10 +743,10 @@ private void initializeBuild() {
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
       partitions[part] = new HashPartition(context, allocator, baseHashTable, 
buildBatch, probeBatch,
-        RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
+        RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), 
numPartitions);
     }
 
-    spilledInners = new HJSpilledPartition[numPartitions];
+    spilledInners = new HashJoinSpilledPartition[numPartitions];
 
   }
 
@@ -836,19 +872,18 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
     }
 
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
-    boolean firstCycle = cycleNum == 0;
 
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT: 
RECORDS_PER_BATCH;
+      int maxBatchSize = spilledState.isFirstCycle()? 
RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
       boolean doMemoryCalculation = canSpill && 
!probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash 
values bug fixed
+      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix 
after growing hash values bug fixed
         buildBatch,
         probeBatch,
         buildJoinColumns,
@@ -862,13 +897,13 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
         batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
-      if (firstCycle && doMemoryCalculation) {
+      if (spilledState.isFirstCycle() && doMemoryCalculation) {
         // Do auto tuning
         buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
       }
     }
 
-    if (firstCycle) {
+    if (spilledState.isFirstCycle()) {
       // Do initial setup only on the first cycle
       delayedSetup();
     }
@@ -906,11 +941,11 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
         }
         final int currentRecordCount = buildBatch.getRecordCount();
 
-        if ( cycleNum > 0 ) {
+        if (!spilledState.isFirstCycle()) {
           read_right_HV_vector = (IntVector) 
buildBatch.getContainer().getLast();
         }
         //create runtime filter
-        if (cycleNum == 0 && enableRuntimeFilter) {
+        if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
           int condFieldIndex = 0;
           for (BloomFilter bloomFilter : bloomFilters) {
@@ -924,10 +959,10 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
 
         // For every record in the build batch, hash the key columns and keep 
the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
-          int hashCode = ( cycleNum == 0 ) ? 
partitions[0].getBuildHashCode(ind)
+          int hashCode = spilledState.isFirstCycle() ? 
partitions[0].getBuildHashCode(ind)
             : read_right_HV_vector.getAccessor().get(ind); // get the hash 
value from the HV column
-          int currPart = hashCode & partitionMask;
-          hashCode >>>= bitsInMask;
+          int currPart = hashCode & spilledState.getPartitionMask();
+          hashCode >>>= spilledState.getBitsInMask();
           // Append the new inner row to the appropriate partition; spill 
(that partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, 
hashCode, buildCalc); // may spill if needed
         }
@@ -942,7 +977,7 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
     }
 
-    if (cycleNum == 0 && enableRuntimeFilter) {
+    if (spilledState.isFirstCycle() && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
         runtimeFilterReporter.sendOut(bloomFilters, probeFields, 
this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
@@ -1004,14 +1039,13 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
 
     for (HashPartition partn : partitions) {
       if ( partn.isSpilled() ) {
-        HJSpilledPartition sp = new HJSpilledPartition();
-        sp.innerSpillFile = partn.getSpillFile();
-        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
-        sp.cycleNum = cycleNum; // remember the current cycle
-        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
-        sp.prevOrigPartn = originalPartition; // for debugging / filename
-        spilledPartitionsList.add(sp);
+        HashJoinSpilledPartition sp = new 
HashJoinSpilledPartition(spilledState.getCycle(),
+          partn.getPartitionNum(),
+          originalPartition,
+          partn.getPartitionBatchesCount(),
+          partn.getSpillFile());
 
+        spilledState.addPartition(sp);
         spilledInners[partn.getPartitionNum()] = sp; // for the outer to find 
the SP later
         partn.closeWriter();
 
@@ -1169,8 +1203,8 @@ private void cleanup() {
     }
 
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-      HJSpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+      HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
       try {
         spillSet.delete(sp.innerSpillFile);
       } catch(IOException e) {
@@ -1210,7 +1244,7 @@ public String makeDebugString() {
    */
   private void updateStats() {
     if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the 
right side is empty
-    if ( cycleNum > 0 ) { return; } // These stats are only for before 
processing spilled files
+    if (!spilledState.isFirstCycle()) { return; } // These stats are only for 
before processing spilled files
 
     final HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
@@ -1227,7 +1261,7 @@ private void updateStats() {
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no 
spill
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
     this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
@@ -1257,7 +1291,7 @@ public void updateMetrics() {
 
   @Override
   public void close() {
-    if ( cycleNum > 0 ) { // spilling happened
+    if (!spilledState.isFirstCycle()) { // spilling happened
       // In case closing due to cancellation, BaseRootExec.close() does not 
close the open
       // SpilledRecordBatch "scanners" as it only knows about the original 
left/right ops.
       killIncoming(false);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 5059b18d216..beddfa68faa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,7 @@
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition);
+  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, 
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, 
HashPartition[] partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean 
buildSideIsEmpty, int numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
   void setTargetOutputCount(int targetOutputCount);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 71abeda1139..a63f63d9a4c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -77,7 +77,7 @@
   private int currRightPartition = 0; // for returning RIGHT/FULL
   IntVector read_left_HV_vector; // HV vector that was read from the spilled 
batch
   private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
-  private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer 
to find the partition
+  private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the 
outer to find the partition
   private boolean buildSideIsEmpty = true;
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
   private int partitionMask = 0; // numPartitions - 1
@@ -110,7 +110,7 @@ public int getOutputCount() {
    * @param rightHVColPosition
    */
   @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch 
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] 
partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int 
numPartitions, int rightHVColPosition) {
+  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch 
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] 
partitions, int cycleNum, VectorContainer container, 
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean 
buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
     this.container = container;
     this.spilledInners = spilledInners;
     this.probeBatch = probeBatch;
@@ -253,9 +253,8 @@ private void executeProbePhase() throws 
SchemaChangeException {
               if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
               partn.completeAnOuterBatch(false);
               // update the partition's spill record with the outer side
-              HashJoinBatch.HJSpilledPartition sp = 
spilledInners[partn.getPartitionNum()];
-              sp.outerSpillFile = partn.getSpillFile();
-              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+              HashJoinBatch.HashJoinSpilledPartition sp = 
spilledInners[partn.getPartitionNum()];
+              sp.updateOuter(partn.getPartitionBatchesCount(), 
partn.getSpillFile());
 
               partn.closeWriter();
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to