This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5859968 DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
5859968 is described below
commit 5859968d525dbb2f65b20a228a7f31dc9e516698
Author: Timothy Farkas <[email protected]>
AuthorDate: Thu Oct 18 16:32:03 2018 -0700
DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
---
.../exec/physical/impl/aggregate/HashAggBatch.java | 7 ++--
.../physical/impl/aggregate/HashAggTemplate.java | 44 ++++++++++------------
.../drill/exec/planner/physical/AggPrelBase.java | 37 +++++++++++++++++-
.../src/test/resources/agg/hashagg/q6.json | 1 +
.../src/test/resources/agg/hashagg/q7_1.json | 1 +
.../src/test/resources/agg/hashagg/q7_2.json | 1 +
.../src/test/resources/agg/hashagg/q7_3.json | 1 +
.../src/test/resources/agg/hashagg/q8.json | 1 +
.../src/test/resources/agg/hashagg/q8_1.json | 1 +
9 files changed, 65 insertions(+), 29 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 80d25ed..485d363 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -49,7 +50,6 @@ import
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -192,9 +192,10 @@ public class HashAggBatch extends
AbstractRecordBatch<HashAggregate> {
long memAvail = oContext.getAllocator().getLimit();
long minBatchesPerPartition =
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover
overheads, etc.
- boolean is2ndPhase = popConfig.getAggPhase() ==
AggPrelBase.OperatorPhase.PHASE_2of2;
boolean fallbackEnabled =
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
- if ( is2ndPhase && !fallbackEnabled ) {
+ final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase();
+
+ if ( phase.is2nd() && !fallbackEnabled ) {
minBatchesNeeded *= 2; // 2nd phase (w/o fallback) needs at least 2
partitions
}
if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast -
memAvail may be bigger than max-int
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6709cf6..32db9ea 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -106,9 +106,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
private int rowsSpilledReturned = 0;
private int rowsReturnedEarly = 0;
- private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
- private boolean is2ndPhase = false;
- private boolean is1stPhase = false;
+ private AggPrelBase.OperatorPhase phase;
private boolean canSpill = true; // make it false in case can not
spill/return-early
private ChainedHashTable baseHashTable;
private boolean earlyOutput = false; // when 1st phase returns a partition
due to no memory
@@ -379,11 +377,8 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.outgoing = outgoing;
this.outContainer = outContainer;
this.useMemoryPrediction =
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
-
- is2ndPhase = hashAggrConfig.getAggPhase() ==
AggPrelBase.OperatorPhase.PHASE_2of2;
- isTwoPhase = hashAggrConfig.getAggPhase() !=
AggPrelBase.OperatorPhase.PHASE_1of1;
- is1stPhase = isTwoPhase && !is2ndPhase;
- canSpill = isTwoPhase; // single phase can not spill
+ this.phase = hashAggrConfig.getAggPhase();
+ canSpill = phase.hasTwo(); // single phase can not spill
// Typically for testing - force a spill after a partition has more than
so many batches
minBatchesPerPartition =
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
@@ -447,7 +442,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
// Set the number of partitions from the configuration (raise to a power
of two, if needed)
int numPartitions =
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
- if ( numPartitions == 1 && is2ndPhase ) { // 1st phase can still do early
return with 1 partition
+ if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do
early return with 1 partition
canSpill = false;
logger.warn("Spilling is disabled due to configuration setting of
num_partitions to 1");
}
@@ -473,7 +468,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 *
1024 * 1024) > memAvail ) {
numPartitions /= 2;
if ( numPartitions < 2) {
- if (is2ndPhase) {
+ if (phase.is2nd()) {
canSpill = false; // 2nd phase needs at least 2 to make progress
if (fallbackEnabled) {
@@ -492,7 +487,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
}
}
}
- logger.debug("{} phase. Number of partitions chosen: {}. {} spill",
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+ logger.debug("{} phase. Number of partitions chosen: {}. {} spill",
phase.getName(),
numPartitions, canSpill ? "Can" : "Cannot");
// The following initial safety check should be revisited once we can
lower the number of rows in a batch
@@ -616,7 +611,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
logger.trace("{} phase. Estimated internal row width: {} Values row width:
{} batch size: {} memory limit: {} max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
+
phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
if ( estMaxBatchSize > allocator.getLimit() ) {
logger.warn("HashAggregate: Estimated max batch size {} is larger than
the memory limit {}",estMaxBatchSize,allocator.getLimit());
@@ -886,7 +881,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
@Override
public void cleanup() {
if ( schema == null ) { return; } // not set up; nothing to clean
- if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
@@ -982,7 +977,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
* @return The partition (number) chosen to be spilled
*/
private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
- if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just
use the current partition
+ if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase:
just use the current partition
int currPartSize = batchHolders[currPart].size();
if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if
size is 1
// first find the largest spilled partition
@@ -1188,7 +1183,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
if (spilledState.isEmpty()) { // and no spilled partitions
allFlushed = true;
this.outcome = IterOutcome.NONE;
- if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB
spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
@@ -1243,7 +1238,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.outcome = IterOutcome.OK;
- if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+ if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) {
logger.debug("So far returned {} + SpilledReturned {} total {} (spilled
{})",rowsNotSpilled,rowsSpilledReturned,
rowsNotSpilled+rowsSpilledReturned,
rowsSpilled);
@@ -1322,12 +1317,12 @@ public abstract class HashAggTemplate implements
HashAggregator {
*/
private String getOOMErrorMsg(String prefix) {
String errmsg;
- if (!isTwoPhase) {
+ if (!phase.hasTwo()) {
errmsg = "Single Phase Hash Aggregate operator can not spill.";
} else if (!canSpill) { // 2nd phase, with only 1 partition
errmsg = "Too little memory available to operator to facilitate
spilling.";
} else { // a bug ?
- errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
+ errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: "
+ spilledState.getNumPartitions() +
". Estimated batch size: " + estMaxBatchSize + ". values size: " +
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " +
plannedBatches; }
if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " +
rowsSpilled; }
@@ -1367,11 +1362,11 @@ public abstract class HashAggTemplate implements
HashAggregator {
hashCode >>>= spilledState.getBitsInMask();
HashTable.PutStatus putStatus = null;
long allocatedBeforeHTput = allocator.getAllocatedMemory();
+ String tryingTo = phase.is1st() ? "early return" : "spill";
// Proactive spill - in case there is no reserve memory - spill and retry
putting later
if ( reserveValueBatchMemory == 0 && canSpill ) {
- logger.trace("Reserved memory runs short, trying to {} a partition and
retry Hash Table put() again.",
- is1stPhase ? "early return" : "spill");
+ logger.trace("Reserved memory runs short, trying to {} a partition and
retry Hash Table put() again.", tryingTo);
doSpill(currentPartition); // spill to free some memory
@@ -1389,8 +1384,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
} catch (RetryAfterSpillException re) {
if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can
not spill")); }
- logger.trace("HT put failed with an OOM, trying to {} a partition and
retry Hash Table put() again.",
- is1stPhase ? "early return" : "spill");
+ logger.trace("HT put failed with an OOM, trying to {} a partition and
retry Hash Table put() again.", tryingTo);
// for debugging - in case there's a leak
long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
@@ -1493,7 +1487,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
// log a detailed debug message explaining why a spill may be needed
logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to
add to partition {} with {} batches. " + "Max memory needed {}, Est batch size
{}, mem limit {}",
- allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" :
"1ST") : "Single", currentPartition, batchHolders[currentPartition].size(),
maxMemoryNeeded,
+ allocator.getAllocatedMemory(), phase.getName(), currentPartition,
batchHolders[currentPartition].size(), maxMemoryNeeded,
estMaxBatchSize, allocator.getLimit());
}
//
@@ -1516,7 +1510,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
return;
}
- if ( is2ndPhase ) {
+ if ( phase.is2nd() ) {
long before = allocator.getAllocatedMemory();
spillAPartition(victimPartition);
@@ -1583,7 +1577,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
this.stats.setLongStat(Metric.NUM_PARTITIONS,
spilledState.getNumPartitions());
this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); //
Put 0 in case no spill
- if ( is2ndPhase ) {
+ if ( phase.is2nd() ) {
this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
if ( rowsReturnedEarly > 0 ) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 84f85ba..f3d527e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -47,7 +47,42 @@ import java.util.List;
public abstract class AggPrelBase extends DrillAggregateRelBase implements
Prel {
- public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}
+ public enum OperatorPhase {
+ PHASE_1of1(false, false, false, "Single"),
+ PHASE_1of2(true, true, false, "1st"),
+ PHASE_2of2(true, false, true, "2nd");
+
+ private boolean hasTwo;
+ private boolean is1st;
+ private boolean is2nd;
+ private String name;
+
+ OperatorPhase(boolean hasTwo,
+ boolean is1st,
+ boolean is2nd,
+ String name) {
+ this.hasTwo = hasTwo;
+ this.is1st = is1st;
+ this.is2nd = is2nd;
+ this.name = name;
+ }
+
+ public boolean hasTwo() {
+ return hasTwo;
+ }
+
+ public boolean is1st() {
+ return is1st;
+ }
+
+ public boolean is2nd() {
+ return is2nd;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1; // default
phase
protected List<NamedExpression> keys = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q6.json
b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
index c155391..35b200a 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q6.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
@@ -35,6 +35,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
index ef05613..3494c79 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
@@ -38,6 +38,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
index 62cf5c3..f77c08b 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
@@ -38,6 +38,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
index 8edc110..37ba0ad 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
@@ -38,6 +38,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8.json
b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
index a457aa9..f67cd29 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
@@ -37,6 +37,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
index 3461c8c..73ce4be 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
@@ -35,6 +35,7 @@
pop : "hash-aggregate",
@id : 3,
child : 2,
+ phase : "PHASE_1of1",
keys : [ {
ref : "$f0",
expr : "$f0"