This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5859968  DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
5859968 is described below

commit 5859968d525dbb2f65b20a228a7f31dc9e516698
Author: Timothy Farkas <[email protected]>
AuthorDate: Thu Oct 18 16:32:03 2018 -0700

    DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
---
 .../exec/physical/impl/aggregate/HashAggBatch.java |  7 ++--
 .../physical/impl/aggregate/HashAggTemplate.java   | 44 ++++++++++------------
 .../drill/exec/planner/physical/AggPrelBase.java   | 37 +++++++++++++++++-
 .../src/test/resources/agg/hashagg/q6.json         |  1 +
 .../src/test/resources/agg/hashagg/q7_1.json       |  1 +
 .../src/test/resources/agg/hashagg/q7_2.json       |  1 +
 .../src/test/resources/agg/hashagg/q7_3.json       |  1 +
 .../src/test/resources/agg/hashagg/q8.json         |  1 +
 .../src/test/resources/agg/hashagg/q8_1.json       |  1 +
 9 files changed, 65 insertions(+), 29 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 80d25ed..485d363 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -49,7 +50,6 @@ import 
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -192,9 +192,10 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     long memAvail = oContext.getAllocator().getLimit();
     long minBatchesPerPartition = 
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
     long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover 
overheads, etc.
-    boolean is2ndPhase = popConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
     boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
-    if ( is2ndPhase && !fallbackEnabled ) {
+    final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase();
+
+    if ( phase.is2nd() && !fallbackEnabled ) {
       minBatchesNeeded *= 2;  // 2nd phase (w/o fallback) needs at least 2 
partitions
     }
     if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - 
memAvail may be bigger than max-int
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6709cf6..32db9ea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -106,9 +106,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private int rowsSpilledReturned = 0;
   private int rowsReturnedEarly = 0;
 
-  private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
-  private boolean is2ndPhase = false;
-  private boolean is1stPhase = false;
+  private AggPrelBase.OperatorPhase phase;
   private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a partition 
due to no memory
@@ -379,11 +377,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.outgoing = outgoing;
     this.outContainer = outContainer;
     this.useMemoryPrediction = 
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
-
-    is2ndPhase = hashAggrConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
-    isTwoPhase = hashAggrConfig.getAggPhase() != 
AggPrelBase.OperatorPhase.PHASE_1of1;
-    is1stPhase = isTwoPhase && !is2ndPhase;
-    canSpill = isTwoPhase; // single phase can not spill
+    this.phase = hashAggrConfig.getAggPhase();
+    canSpill = phase.hasTwo(); // single phase can not spill
 
     // Typically for testing - force a spill after a partition has more than 
so many batches
     minBatchesPerPartition = 
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
@@ -447,7 +442,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
     int numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
-    if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early 
return with 1 partition
+    if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do 
early return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
     }
@@ -473,7 +468,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 
1024 * 1024) > memAvail ) {
         numPartitions /= 2;
         if ( numPartitions < 2) {
-          if (is2ndPhase) {
+          if (phase.is2nd()) {
             canSpill = false;  // 2nd phase needs at least 2 to make progress
 
             if (fallbackEnabled) {
@@ -492,7 +487,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         }
       }
     }
-    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", 
phase.getName(),
         numPartitions, canSpill ? "Can" : "Cannot");
 
     // The following initial safety check should be revisited once we can 
lower the number of rows in a batch
@@ -616,7 +611,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
 
     logger.trace("{} phase. Estimated internal row width: {} Values row width: 
{} batch size: {}  memory limit: {}  max column width: {}",
-        
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
+      
phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
 
     if ( estMaxBatchSize > allocator.getLimit() ) {
       logger.warn("HashAggregate: Estimated max batch size {} is larger than 
the memory limit {}",estMaxBatchSize,allocator.getLimit());
@@ -886,7 +881,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   @Override
   public void cleanup() {
     if ( schema == null ) { return; } // not set up; nothing to clean
-    if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+    if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
@@ -982,7 +977,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
    * @return The partition (number) chosen to be spilled
    */
   private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
-    if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just 
use the current partition
+    if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase: 
just use the current partition
     int currPartSize = batchHolders[currPart].size();
     if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if 
size is 1
     // first find the largest spilled partition
@@ -1188,7 +1183,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
-          if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+          if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
             stats.setLongStat(Metric.SPILL_MB, // update stats - total MB 
spilled
                 (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
           }
@@ -1243,7 +1238,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     this.outcome = IterOutcome.OK;
 
-    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+    if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) {
       logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled 
{})",rowsNotSpilled,rowsSpilledReturned,
         rowsNotSpilled+rowsSpilledReturned,
         rowsSpilled);
@@ -1322,12 +1317,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
    */
   private String getOOMErrorMsg(String prefix) {
     String errmsg;
-    if (!isTwoPhase) {
+    if (!phase.hasTwo()) {
       errmsg = "Single Phase Hash Aggregate operator can not spill.";
     } else if (!canSpill) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate 
spilling.";
     } else { // a bug ?
-      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
+      errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: " 
+ spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + 
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
       if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + 
plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + 
rowsSpilled; }
@@ -1367,11 +1362,11 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     hashCode >>>= spilledState.getBitsInMask();
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
+    String tryingTo = phase.is1st() ? "early return" : "spill";
 
     // Proactive spill - in case there is no reserve memory - spill and retry 
putting later
     if ( reserveValueBatchMemory == 0 && canSpill ) {
-      logger.trace("Reserved memory runs short, trying to {} a partition and 
retry Hash Table put() again.",
-        is1stPhase ? "early return" : "spill");
+      logger.trace("Reserved memory runs short, trying to {} a partition and 
retry Hash Table put() again.", tryingTo);
 
       doSpill(currentPartition); // spill to free some memory
 
@@ -1389,8 +1384,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     } catch (RetryAfterSpillException re) {
       if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can 
not spill")); }
 
-      logger.trace("HT put failed with an OOM, trying to {} a partition and 
retry Hash Table put() again.",
-            is1stPhase ? "early return" : "spill");
+      logger.trace("HT put failed with an OOM, trying to {} a partition and 
retry Hash Table put() again.", tryingTo);
 
       // for debugging - in case there's a leak
       long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
@@ -1493,7 +1487,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
       // log a detailed debug message explaining why a spill may be needed
       logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to 
add to partition {} with {} batches. " + "Max memory needed {}, Est batch size 
{}, mem limit {}",
-          allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : 
"1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), 
maxMemoryNeeded,
+          allocator.getAllocatedMemory(), phase.getName(), currentPartition, 
batchHolders[currentPartition].size(), maxMemoryNeeded,
           estMaxBatchSize, allocator.getLimit());
     }
     //
@@ -1516,7 +1510,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         return;
       }
 
-      if ( is2ndPhase ) {
+      if ( phase.is2nd() ) {
         long before = allocator.getAllocatedMemory();
 
         spillAPartition(victimPartition);
@@ -1583,7 +1577,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, 
spilledState.getNumPartitions());
     this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
-    if ( is2ndPhase ) {
+    if ( phase.is2nd() ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
     if ( rowsReturnedEarly > 0 ) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 84f85ba..f3d527e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -47,7 +47,42 @@ import java.util.List;
 
 public abstract class AggPrelBase extends DrillAggregateRelBase implements 
Prel {
 
-  public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}
+  public enum OperatorPhase {
+    PHASE_1of1(false, false, false, "Single"),
+    PHASE_1of2(true, true, false, "1st"),
+    PHASE_2of2(true, false, true, "2nd");
+
+    private boolean hasTwo;
+    private boolean is1st;
+    private boolean is2nd;
+    private String name;
+
+    OperatorPhase(boolean hasTwo,
+                  boolean is1st,
+                  boolean is2nd,
+                  String name) {
+      this.hasTwo = hasTwo;
+      this.is1st = is1st;
+      this.is2nd = is2nd;
+      this.name = name;
+    }
+
+    public boolean hasTwo() {
+      return hasTwo;
+    }
+
+    public boolean is1st() {
+      return is1st;
+    }
+
+    public boolean is2nd() {
+      return is2nd;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
 
   protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1; // default 
phase
   protected List<NamedExpression> keys = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q6.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
index c155391..35b200a 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q6.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
@@ -35,6 +35,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
index ef05613..3494c79 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
index 62cf5c3..f77c08b 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
index 8edc110..37ba0ad 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
index a457aa9..f67cd29 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
@@ -37,6 +37,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
index 3461c8c..73ce4be 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
@@ -35,6 +35,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"

Reply via email to