This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 2882b89 DRILL-6719: Separate spilling queue logic from HashJoin and
HashAgg.
2882b89 is described below
commit 2882b89c2288a9df7c9abd5321cb85c588312b8c
Author: Timothy Farkas <[email protected]>
AuthorDate: Tue Aug 28 16:28:49 2018 -0700
DRILL-6719: Separate spilling queue logic from HashJoin and HashAgg.
---
.../physical/impl/aggregate/HashAggTemplate.java | 210 ++++++++++----------
.../common/AbstractSpilledPartitionMetadata.java | 47 +++++
.../impl/common/SpilledPartitionMetadata.java | 42 ++++
.../exec/physical/impl/common/SpilledState.java | 216 +++++++++++++++++++++
.../exec/physical/impl/join/HashJoinBatch.java | 180 ++++++++++-------
.../exec/physical/impl/join/HashJoinProbe.java | 2 +-
.../physical/impl/join/HashJoinProbeTemplate.java | 9 +-
7 files changed, 516 insertions(+), 190 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index f6dd3da..6709cf6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -48,12 +48,14 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashAggregate;
+import
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
+import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -80,6 +82,7 @@ import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
@@ -95,9 +98,6 @@ public abstract class HashAggTemplate implements
HashAggregator {
private static final boolean EXTRA_DEBUG_SPILL = false;
// Fields needed for partitioning (the groups into partitions)
- private int numPartitions = 0; // must be 2 to the power of bitsInMask (set
in setup())
- private int partitionMask; // numPartitions - 1
- private int bitsInMask; // number of bits in the MASK
private int nextPartitionToReturn = 0; // which partition to return the next
batch from
// The following members are used for logging, metrics, etc.
private int rowsInPartition = 0; // counts #rows in each partition
@@ -148,25 +148,15 @@ public abstract class HashAggTemplate implements
HashAggregator {
private int outBatchIndex[];
// For handling spilling
+ private HashAggUpdater updater = new HashAggUpdater();
+ private SpilledState<HashAggSpilledPartition> spilledState = new
SpilledState<>();
private SpillSet spillSet;
SpilledRecordbatch newIncoming; // when reading a spilled file - work like
an "incoming"
private Writer writers[]; // a vector writer for each spilled partition
private int spilledBatchesCount[]; // count number of batches spilled, in
each partition
private String spillFiles[];
- private int cycleNum = 0; // primary, secondary, tertiary, etc.
private int originalPartition = -1; // the partition a secondary reads from
- private static class SpilledPartition {
- public int spilledBatches;
- public String spillFile;
- int cycleNum;
- int origPartn;
- int prevOrigPartn;
- }
-
- private ArrayList<SpilledPartition> spilledPartitionsList;
- private int operatorId; // for the spill file name
-
private IndexPointer htIdxHolder; // holder for the Hashtable's internal
index returned by put()
private int numGroupByOutFields = 0; // Note: this should be <= number of
group-by fields
private TypedFieldId[] groupByOutFieldIds;
@@ -180,6 +170,59 @@ public abstract class HashAggTemplate implements
HashAggregator {
private OperatorStats stats = null;
private HashTableStats htStats = new HashTableStats();
+ public static class HashAggSpilledPartition extends
AbstractSpilledPartitionMetadata {
+ private final int spilledBatches;
+ private final String spillFile;
+
+ public HashAggSpilledPartition(final int cycle,
+ final int originPartition,
+ final int prevOriginPartition,
+ final int spilledBatches,
+ final String spillFile) {
+ super(cycle, originPartition, prevOriginPartition);
+
+ this.spilledBatches = spilledBatches;
+ this.spillFile = Preconditions.checkNotNull(spillFile);
+ }
+
+ public int getSpilledBatches() {
+ return spilledBatches;
+ }
+
+ public String getSpillFile() {
+ return spillFile;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return String.format("Start reading spilled partition %d (prev %d) from
cycle %d.",
+ this.getOriginPartition(), this.getPrevOriginPartition(),
this.getCycle());
+ }
+ }
+
+ public class HashAggUpdater implements SpilledState.Updater {
+
+ @Override
+ public void cleanup() {
+ this.cleanup();
+ }
+
+ @Override
+ public String getFailureMessage() {
+ return null;
+ }
+
+ @Override
+ public long getMemLimit() {
+ return allocator.getLimit();
+ }
+
+ @Override
+ public boolean hasPartitionLimit() {
+ return false;
+ }
+ }
+
public enum Metric implements MetricDef {
NUM_BUCKETS,
@@ -335,7 +378,6 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.incoming = incoming;
this.outgoing = outgoing;
this.outContainer = outContainer;
- this.operatorId = hashAggrConfig.getOperatorId();
this.useMemoryPrediction =
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
is2ndPhase = hashAggrConfig.getAggPhase() ==
AggPrelBase.OperatorPhase.PHASE_2of2;
@@ -404,7 +446,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
final boolean fallbackEnabled =
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
// Set the number of partitions from the configuration (raise to a power
of two, if needed)
- numPartitions =
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+ int numPartitions =
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 && is2ndPhase ) { // 1st phase can still do early
return with 1 partition
canSpill = false;
logger.warn("Spilling is disabled due to configuration setting of
num_partitions to 1");
@@ -460,9 +502,8 @@ public abstract class HashAggTemplate implements
HashAggregator {
// (but 1st phase can still spill, so it will maintain the original
memory limit)
allocator.setLimit(AbstractBase.MAX_ALLOCATION); // 10_000_000_000L
}
- // Based on the number of partitions: Set the mask and bit count
- partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
- bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+ spilledState.initialize(numPartitions);
// Create arrays (one entry per partition)
htables = new HashTable[numPartitions];
@@ -471,7 +512,6 @@ public abstract class HashAggTemplate implements
HashAggregator {
writers = new Writer[numPartitions];
spilledBatchesCount = new int[numPartitions];
spillFiles = new String[numPartitions];
- spilledPartitionsList = new ArrayList<SpilledPartition>();
plannedBatches = numPartitions; // each partition should allocate its
first batch
@@ -511,7 +551,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.incoming = newIncoming;
currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in
this spill file
nextPartitionToReturn = 0;
- for (int i = 0; i < numPartitions; i++ ) {
+ for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
htables[i].updateIncoming(newIncoming.getContainer(), null);
htables[i].reset();
if ( batchHolders[i] != null) {
@@ -829,7 +869,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
@Override
public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int
newRowWidth) {
- for (int i = 0; i < numPartitions; i++ ) {
+ for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
if (batchHolders[i] == null || batchHolders[i].size() == 0) {
continue;
}
@@ -851,7 +891,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
// clean (and deallocate) each partition
- for ( int i = 0; i < numPartitions; i++) {
+ for ( int i = 0; i < spilledState.getNumPartitions(); i++) {
if (htables[i] != null) {
htables[i].clear();
htables[i] = null;
@@ -877,12 +917,12 @@ public abstract class HashAggTemplate implements
HashAggregator {
}
}
// delete any spill file left in unread spilled partitions
- while ( ! spilledPartitionsList.isEmpty() ) {
- SpilledPartition sp = spilledPartitionsList.remove(0);
+ while (!spilledState.isEmpty()) {
+ HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
try {
- spillSet.delete(sp.spillFile);
+ spillSet.delete(sp.getSpillFile());
} catch(IOException e) {
- logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+ logger.warn("Cleanup: Failed to delete spill file
{}",sp.getSpillFile());
}
}
// Delete the currently handled (if any) spilled file
@@ -948,7 +988,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
// first find the largest spilled partition
int maxSizeSpilled = -1;
int indexMaxSpilled = -1;
- for (int isp = 0; isp < numPartitions; isp++ ) {
+ for (int isp = 0; isp < spilledState.getNumPartitions(); isp++ ) {
if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
maxSizeSpilled = batchHolders[isp].size();
indexMaxSpilled = isp;
@@ -967,7 +1007,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
indexMax = indexMaxSpilled;
maxSize = 4 * maxSizeSpilled;
}
- for ( int insp = 0; insp < numPartitions; insp++) {
+ for ( int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
indexMax = insp;
maxSize = batchHolders[insp].size();
@@ -993,7 +1033,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
ArrayList<BatchHolder> currPartition = batchHolders[part];
rowsInPartition = 0;
if ( EXTRA_DEBUG_SPILL ) {
- logger.debug("HashAggregate: Spilling partition {} current cycle {} part
size {}", part, cycleNum, currPartition.size());
+ logger.debug("HashAggregate: Spilling partition {} current cycle {} part
size {}", part, spilledState.getCycle(), currPartition.size());
}
if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to
spill
@@ -1001,7 +1041,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
// If this is the first spill for this partition, create an output stream
if ( ! isSpilled(part) ) {
- spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ?
Integer.toString(cycleNum) : null);
+ spillFiles[part] = spillSet.getNextSpillFile(spilledState.getCycle() > 0
? Integer.toString(spilledState.getCycle()) : null);
try {
writers[part] = spillSet.writer(spillFiles[part]);
@@ -1025,21 +1065,8 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.htables[part].outputKeys(currOutBatchIndex, this.outContainer,
numOutputRecords);
// set the value count for outgoing batch value vectors
- /* int i = 0; */
for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(numOutputRecords);
- /*
- // print out the first row to be spilled ( varchar, varchar, bigint )
- try {
- if (i++ < 2) {
- NullableVarCharVector vv = ((NullableVarCharVector)
v.getValueVector());
- logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
- } else {
- NullableBigIntVector vv = ((NullableBigIntVector)
v.getValueVector());
- logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
- }
- } catch (Exception e) { logger.info("While printing the first row -
Got an exception = {}",e); }
- */
}
outContainer.setRecordCount(numOutputRecords);
@@ -1119,19 +1146,20 @@ public abstract class HashAggTemplate implements
HashAggregator {
if ( ! earlyOutput ) {
// Update the next partition to return (if needed)
// skip fully returned (or spilled) partitions
- while (nextPartitionToReturn < numPartitions) {
+ while (nextPartitionToReturn < spilledState.getNumPartitions()) {
//
// If this partition was spilled - spill the rest of it and skip it
//
if ( isSpilled(nextPartitionToReturn) ) {
spillAPartition(nextPartitionToReturn); // spill the rest
- SpilledPartition sp = new SpilledPartition();
- sp.spillFile = spillFiles[nextPartitionToReturn];
- sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
- sp.cycleNum = cycleNum; // remember the current cycle
- sp.origPartn = nextPartitionToReturn; // for debugging / filename
- sp.prevOrigPartn = originalPartition; // for debugging / filename
- spilledPartitionsList.add(sp);
+ HashAggSpilledPartition sp = new HashAggSpilledPartition(
+ spilledState.getCycle(),
+ nextPartitionToReturn,
+ originalPartition,
+ spilledBatchesCount[nextPartitionToReturn],
+ spillFiles[nextPartitionToReturn]);
+
+ spilledState.addPartition(sp);
reinitPartition(nextPartitionToReturn); // free the memory
try {
@@ -1155,9 +1183,9 @@ public abstract class HashAggTemplate implements
HashAggregator {
}
// if passed the last partition - either done or need to restart and
read spilled partitions
- if (nextPartitionToReturn >= numPartitions) {
+ if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
// The following "if" is probably never used; due to a similar check
at the end of this method
- if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+ if (spilledState.isEmpty()) { // and no spilled partitions
allFlushed = true;
this.outcome = IterOutcome.NONE;
if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
@@ -1170,10 +1198,10 @@ public abstract class HashAggTemplate implements
HashAggregator {
buildComplete = false; // go back and call doWork() again
handlingSpills = true; // beginning to work on the spill files
// pick a spilled partition; set a new incoming ...
- SpilledPartition sp = spilledPartitionsList.remove(0);
+ HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
// Create a new "incoming" out of the spilled partition spill file
- newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches,
context, schema, oContext, spillSet);
- originalPartition = sp.origPartn; // used for the filename
+ newIncoming = new SpilledRecordbatch(sp.getSpillFile(),
sp.getSpilledBatches(), context, schema, oContext, spillSet);
+ originalPartition = sp.getOriginPartition(); // used for the filename
logger.trace("Reading back spilled original partition {} as an
incoming",originalPartition);
// Initialize .... new incoming, new set of partitions
try {
@@ -1181,22 +1209,8 @@ public abstract class HashAggTemplate implements
HashAggregator {
} catch (Exception e) {
throw new RuntimeException(e);
}
- // update the cycle num if needed
- // The current cycle num should always be one larger than in the
spilled partition
- if ( cycleNum == sp.cycleNum ) {
- cycleNum = 1 + sp.cycleNum;
- stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
- // report first spill or memory stressful situations
- if ( cycleNum == 1 ) { logger.info("Started reading spilled records
"); }
- if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
- if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
- if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
- if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
- }
- if ( EXTRA_DEBUG_SPILL ) {
- logger.debug("Start reading spilled partition {} (prev {}) from
cycle {} (with {} batches). More {} spilled partitions left.",
- sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches,
spilledPartitionsList.size());
- }
+
+ spilledState.updateCycle(stats, sp, updater);
return AggIterOutcome.AGG_RESTART;
}
@@ -1265,7 +1279,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
return AggIterOutcome.AGG_EMIT;
}
- else if ( (partitionToReturn + 1 == numPartitions) &&
spilledPartitionsList.isEmpty() ) { // last partition ?
+ else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) &&
spilledState.isEmpty()) { // last partition ?
allFlushed = true; // next next() call will return NONE
@@ -1313,7 +1327,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
} else if (!canSpill) { // 2nd phase, with only 1 partition
errmsg = "Too little memory available to operator to facilitate
spilling.";
} else { // a bug ?
- errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First
Phase") + ". Partitions: " + numPartitions +
+ errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
". Estimated batch size: " + estMaxBatchSize + ". values size: " +
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " +
plannedBatches; }
if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " +
rowsSpilled; }
@@ -1334,32 +1348,6 @@ public abstract class HashAggTemplate implements
HashAggregator {
assert incomingRowIdx >= 0;
assert ! earlyOutput;
- /** for debugging
- Object tmp = (incoming).getValueAccessorById(0,
BigIntVector.class).getValueVector();
- BigIntVector vv0 = null;
- BigIntHolder holder = null;
-
- if (tmp != null) {
- vv0 = ((BigIntVector) tmp);
- holder = new BigIntHolder();
- holder.value = vv0.getAccessor().get(incomingRowIdx) ;
- }
- */
- /*
- if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
- // for debugging -- show the first row from a spilled batch
- Object tmp0 =
(incoming).getValueAccessorById(NullableVarCharVector.class,
0).getValueVector();
- Object tmp1 =
(incoming).getValueAccessorById(NullableVarCharVector.class,
1).getValueVector();
- Object tmp2 =
(incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
-
- if (tmp0 != null && tmp1 != null && tmp2 != null) {
- NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
- NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
- NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
- logger.debug("The first row = {} , {} , {}",
vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx),
vv2.getAccessor().get(incomingRowIdx));
- }
- }
- */
// The hash code is computed once, then its lower bits are used to
determine the
// partition to use, and the higher bits determine the location in the
hash table.
int hashCode;
@@ -1371,12 +1359,12 @@ public abstract class HashAggTemplate implements
HashAggregator {
}
// right shift hash code for secondary (or tertiary...) spilling
- for (int i = 0; i < cycleNum; i++) {
- hashCode >>>= bitsInMask;
+ for (int i = 0; i < spilledState.getCycle(); i++) {
+ hashCode >>>= spilledState.getBitsInMask();
}
- int currentPartition = hashCode & partitionMask;
- hashCode >>>= bitsInMask;
+ int currentPartition = hashCode & spilledState.getPartitionMask();
+ hashCode >>>= spilledState.getBitsInMask();
HashTable.PutStatus putStatus = null;
long allocatedBeforeHTput = allocator.getAllocatedMemory();
@@ -1498,7 +1486,7 @@ public abstract class HashAggTemplate implements
HashAggregator {
maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
(estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
// Add the (max) size of the current hash table, in case it will double
int maxSize = 1;
- for (int insp = 0; insp < numPartitions; insp++) {
+ for (int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
maxSize = Math.max(maxSize, batchHolders[insp].size());
}
maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 -
double, 2 - max when %50 full, 4 - Uint4
@@ -1577,12 +1565,12 @@ public abstract class HashAggTemplate implements
HashAggregator {
* @param htables
*/
private void updateStats(HashTable[] htables) {
- if ( cycleNum > 0 || // These stats are only for before processing
spilled files
+ if (!spilledState.isFirstCycle() || // These stats are only for before
processing spilled files
handleEmit ) { return; } // and no stats collecting when handling an EMIT
long numSpilled = 0;
HashTableStats newStats = new HashTableStats();
// sum the stats from all the partitions
- for (int ind = 0; ind < numPartitions; ind++) {
+ for (int ind = 0; ind < spilledState.getNumPartitions(); ind++) {
htables[ind].getStats(newStats);
htStats.addStats(newStats);
if (isSpilled(ind)) {
@@ -1593,8 +1581,8 @@ public abstract class HashAggTemplate implements
HashAggregator {
this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
- this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
- this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no
spill
+ this.stats.setLongStat(Metric.NUM_PARTITIONS,
spilledState.getNumPartitions());
+ this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); //
Put 0 in case no spill
if ( is2ndPhase ) {
this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
new file mode 100644
index 0000000..951fa66
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+public abstract class AbstractSpilledPartitionMetadata implements
SpilledPartitionMetadata {
+ private final int cycle;
+ private final int originPartition;
+ private final int prevOriginPartition;
+
+ public AbstractSpilledPartitionMetadata(final int cycle,
+ final int originPartition,
+ final int prevOriginPartition) {
+ this.cycle = cycle;
+ this.originPartition = originPartition;
+ this.prevOriginPartition = prevOriginPartition;
+ }
+
+ @Override
+ public int getCycle() {
+ return cycle;
+ }
+
+ @Override
+ public int getOriginPartition() {
+ return originPartition;
+ }
+
+ @Override
+ public int getPrevOriginPartition() {
+ return prevOriginPartition;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
new file mode 100644
index 0000000..4d53be3
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+/**
+ * This interface represents the metadata for a spilled partition.
+ */
+public interface SpilledPartitionMetadata {
+ /**
+ * The spill cycle for a partition.
+ * @return The spill cycle for a partition.
+ */
+ int getCycle();
+
+ /**
+ * The parent partition.
+ * @return The parent partition.
+ */
+ int getOriginPartition();
+
+ /**
+ * The parent of parent partition.
+ * @return The parent of parent partition.
+ */
+ int getPrevOriginPartition();
+ String makeDebugString();
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
new file mode 100644
index 0000000..a176287
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Manages the spilling information for an operator.
+ * @param <T> The class holding spilled partition metadata.
+ *
+ * <h4>Lifecycle</h4>
+ * <ol>
+ * <li>Create a {@link SpilledState} instance.</li>
+ * <li>Call {@link SpilledState#initialize(int)}</li>
+ * <li>Call {@link SpilledState#addPartition(SpilledPartitionMetadata)}
(SpilledPartitionMetadata)}, {@link SpilledState#getNextSpilledPartition()}, or
+ * {@link SpilledState#updateCycle(OperatorStats,
SpilledPartitionMetadata, Updater)}</li>
+ * </ol>
+ *
+ * <p>
+ * <ul>
+ * <li>A user can call {@link SpilledState#getCycle()} at any time.</li>
+ * </ul>
+ * </p>
+ */
+public class SpilledState<T extends SpilledPartitionMetadata> {
+ public static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(SpilledState.class);
+
+ private int numPartitions;
+ private int partitionMask;
+ private int bitsInMask;
+
+ private int cycle = 0;
+ private Queue<T> queue = new LinkedList<>();
+ private boolean initialized = false;
+
+ public SpilledState() {
+ }
+
+ /**
+ * Initializes the number of partitions to use for the spilled state.
+ * @param numPartitions The number of partitions to use for the spilled
state.
+ */
+ public void initialize(int numPartitions) {
+ Preconditions.checkState(!initialized);
+ Preconditions.checkArgument(numPartitions >= 1); // Numpartitions must be
positive
+ Preconditions.checkArgument((numPartitions & (numPartitions - 1)) == 0);
// Make sure it's a power of two
+
+ this.numPartitions = numPartitions;
+ initialized = true;
+ partitionMask = numPartitions - 1;
+ bitsInMask = Integer.bitCount(partitionMask);
+ }
+
+ /**
+ * Gets the number of partitions.
+ * @return The number of partitions.
+ */
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ /**
+ * True if this is the first cycle (0).
+ * @return True if this is the first cycle (0).
+ */
+ public boolean isFirstCycle() {
+ return cycle == 0;
+ }
+
+ public int getPartitionMask() {
+ return partitionMask;
+ }
+
+ public int getBitsInMask() {
+ return bitsInMask;
+ }
+
+ /**
+ * Add the partition metadata to the end of the queue to be processed.
+ * @param spilledPartition The partition metadata to process.
+ * @return
+ */
+ public boolean addPartition(T spilledPartition) {
+ Preconditions.checkState(initialized);
+ return queue.offer(spilledPartition);
+ }
+
+ /**
+ * Get the next spilled partition to process.
+ * @return The metadata for the next spilled partition to process.
+ */
+ public T getNextSpilledPartition() {
+ Preconditions.checkState(initialized);
+ return queue.poll();
+ }
+
+ /**
+ * True if there are no spilled partitions.
+ * @return True if there are no spilled partitions.
+ */
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ /**
+ * Update the current spill cycle.
+ * @param operatorStats Current operator stats.
+ * @param spilledPartition The next spilled partition metadata to process.
+ * @param updater The updater implementation that executes custom logic if a
spill cycle is incremented.
+ */
+ public void updateCycle(final OperatorStats operatorStats,
+ final T spilledPartition,
+ final Updater updater) {
+ Preconditions.checkState(initialized);
+ Preconditions.checkNotNull(operatorStats);
+ Preconditions.checkNotNull(spilledPartition);
+ Preconditions.checkNotNull(updater);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(spilledPartition.makeDebugString());
+ }
+
+ if (cycle == spilledPartition.getCycle()) {
+ // Update the cycle num if needed.
+ // The current cycle num should always be one larger than in the spilled
partition.
+
+ cycle = 1 + spilledPartition.getCycle();
+ operatorStats.setLongStat(HashJoinBatch.Metric.SPILL_CYCLE, cycle); //
update stats
+
+ if (logger.isDebugEnabled()) {
+ // report first spill or memory stressful situations
+ if (cycle == 1) {
+ logger.debug("Started reading spilled records ");
+ } else if (cycle == 2) {
+ logger.debug("SECONDARY SPILLING ");
+ } else if (cycle == 3) {
+ logger.debug("TERTIARY SPILLING ");
+ } else if (cycle == 4) {
+ logger.debug("QUATERNARY SPILLING ");
+ } else if (cycle == 5) {
+ logger.debug("QUINARY SPILLING ");
+ }
+ }
+
+ if (updater.hasPartitionLimit() && cycle * bitsInMask > 20) {
+ queue.offer(spilledPartition); // so cleanup() would delete the curr
spill files
+ updater.cleanup();
+ throw UserException
+ .unsupportedError()
+ .message("%s.\n On cycle num %d mem available %d num partitions
%d.", updater.getFailureMessage(), cycle, updater.getMemLimit(), numPartitions)
+ .build(logger);
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(spilledPartition.makeDebugString());
+ }
+ }
+
+ /**
+ * Gets the current spill cycle.
+ * @return The current spill cycle.
+ */
+ public int getCycle() {
+ return cycle;
+ }
+
+ /**
+ * This is a class that is used to do updates of the spilled state.
+ */
+ public interface Updater {
+ /**
+ * Does any necessary cleanup if we've spilled too much and abort the
query.
+ */
+ void cleanup();
+
+ /**
+ * Gets the failure message in the event that we spilled to far.
+ * @return The failure message in the event that we spilled to far.
+ */
+ String getFailureMessage();
+
+ /**
+ * Gets the current memory limit.
+ * @return The current memory limit.
+ */
+ long getMemLimit();
+
+ /**
+ * True if there is a limit to the number of times we can partition.
+ * @return True if there is a limit to the number of times we can
partition.
+ */
+ boolean hasPartitionLimit();
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3d45696..929811c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -23,9 +23,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.drill.common.exceptions.UserException;
@@ -56,12 +55,14 @@ import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -80,6 +81,7 @@ import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -137,8 +139,6 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
* The number of {@link HashPartition}s. This is configured via a system
option and set in {@link #partitionNumTuning(int,
HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
private int numPartitions = 1; // must be 2 to the power of bitsInMask
- private int partitionMask = 0; // numPartitions - 1
- private int bitsInMask = 0; // number of bits in the MASK
/**
* The master class used to generate {@link HashTable}s.
@@ -182,7 +182,6 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
private SpillSet spillSet;
HashJoinPOP popConfig;
- private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
private int originalPartition = -1; // the partition a secondary reads from
IntVector read_right_HV_vector; // HV vector that was read from the spilled
batch
private int maxBatchesInMemory;
@@ -195,21 +194,85 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
/**
* This holds information about the spilled partitions for the build and
probe side.
*/
- public static class HJSpilledPartition {
- public int innerSpilledBatches;
- public String innerSpillFile;
- public int outerSpilledBatches;
- public String outerSpillFile;
- int cycleNum;
- int origPartn;
- int prevOrigPartn;
+ public static class HashJoinSpilledPartition extends
AbstractSpilledPartitionMetadata {
+ private final int innerSpilledBatches;
+ private final String innerSpillFile;
+ private int outerSpilledBatches;
+ private String outerSpillFile;
+ private boolean updatedOuter = false;
+
+ public HashJoinSpilledPartition(final int cycle,
+ final int originPartition,
+ final int prevOriginPartition,
+ final int innerSpilledBatches,
+ final String innerSpillFile) {
+ super(cycle, originPartition, prevOriginPartition);
+
+ this.innerSpilledBatches = innerSpilledBatches;
+ this.innerSpillFile = innerSpillFile;
+ }
+
+ public int getInnerSpilledBatches() {
+ return innerSpilledBatches;
+ }
+
+ public String getInnerSpillFile() {
+ return innerSpillFile;
+ }
+
+ public int getOuterSpilledBatches() {
+ Preconditions.checkState(updatedOuter);
+ return outerSpilledBatches;
+ }
+
+ public String getOuterSpillFile() {
+ Preconditions.checkState(updatedOuter);
+ return outerSpillFile;
+ }
+
+ public void updateOuter(final int outerSpilledBatches, final String
outerSpillFile) {
+ Preconditions.checkState(!updatedOuter);
+ updatedOuter = true;
+
+ this.outerSpilledBatches = outerSpilledBatches;
+ this.outerSpillFile = outerSpillFile;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return String.format("Start reading spilled partition %d (prev %d) from
cycle %d (with %d-%d batches).",
+ this.getOriginPartition(), this.getPrevOriginPartition(),
this.getCycle(), outerSpilledBatches, innerSpilledBatches);
+ }
+ }
+
+ public class HashJoinUpdater implements SpilledState.Updater {
+ @Override
+ public void cleanup() {
+ HashJoinBatch.this.cleanup();
+ }
+
+ @Override
+ public String getFailureMessage() {
+ return "Hash-Join can not partition the inner data any further (probably
due to too many join-key duplicates).";
+ }
+
+ @Override
+ public long getMemLimit() {
+ return HashJoinBatch.this.allocator.getLimit();
+ }
+
+ @Override
+ public boolean hasPartitionLimit() {
+ return true;
+ }
}
/**
* Queue of spilled partitions to process.
*/
- private ArrayList<HJSpilledPartition> spilledPartitionsList = new
ArrayList<>();
- private HJSpilledPartition spilledInners[]; // for the outer to find the
partition
+ private SpilledState<HashJoinSpilledPartition> spilledState = new
SpilledState<>();
+ private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+ private HashJoinSpilledPartition spilledInners[]; // for the outer to find
the partition
public enum Metric implements MetricDef {
NUM_BUCKETS,
@@ -352,7 +415,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
state = BatchState.STOP;
} else {
// Got our first batch(es)
- if (cycleNum == 0) {
+ if (spilledState.isFirstCycle()) {
// Only collect stats for the first cylce
memoryManagerUpdate.run();
}
@@ -470,7 +533,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
joinType,
leftUpstream,
partitions,
- cycleNum,
+ spilledState.getCycle(),
container,
spilledInners,
buildSideIsEmpty.booleanValue(),
@@ -514,9 +577,9 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
//
// (recursively) Handle the spilled partitions, if any
//
- if (!buildSideIsEmpty.booleanValue() &&
!spilledPartitionsList.isEmpty()) {
+ if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
// Get the next (previously) spilled partition to handle as incoming
- HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+ HashJoinSpilledPartition currSp =
spilledState.getNextSpilledPartition();
// Create a BUILD-side "incoming" out of the inner spill file of
that partition
buildBatch = new SpilledRecordbatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
@@ -534,32 +597,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
hashJoinProbe.changeToFinalProbeState();
}
- // update the cycle num if needed
- // The current cycle num should always be one larger than in the
spilled partition
- if (cycleNum == currSp.cycleNum) {
- cycleNum = 1 + currSp.cycleNum;
- stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
- // report first spill or memory stressful situations
- if (cycleNum == 1) { logger.info("Started reading spilled records
"); }
- if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
- if (cycleNum == 3) { logger.warn("TERTIARY SPILLING "); }
- if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
- if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
- if ( cycleNum * bitsInMask > 20 ) {
- spilledPartitionsList.add(currSp); // so cleanup() would delete
the curr spill files
- this.cleanup();
- throw UserException
- .unsupportedError()
- .message("Hash-Join can not partition the inner data any
further (probably due to too many join-key duplicates)\n"
- + "On cycle num %d mem available %d num partitions %d",
cycleNum, allocator.getLimit(), numPartitions)
- .build(logger);
- }
- }
- logger.debug("Start reading spilled partition {} (prev {}) from
cycle {} (with {}-{} batches)." +
- " More {} spilled partitions left.",
- currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum,
currSp.outerSpilledBatches,
- currSp.innerSpilledBatches, spilledPartitionsList.size());
-
+ spilledState.updateCycle(stats, currSp, spilledStateUpdater);
state = BatchState.FIRST; // TODO need to determine if this is
still necessary since prefetchFirstBatchFromBothSides sets this
prefetchedBuild.setValue(false);
@@ -692,9 +730,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
// See partitionNumTuning()
//
- partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
- bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-
+ spilledState.initialize(numPartitions);
// Create array for the partitions
partitions = new HashPartition[numPartitions];
}
@@ -707,10 +743,10 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
partitions[part] = new HashPartition(context, allocator, baseHashTable,
buildBatch, probeBatch,
- RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
+ RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(),
numPartitions);
}
- spilledInners = new HJSpilledPartition[numPartitions];
+ spilledInners = new HashJoinSpilledPartition[numPartitions];
}
@@ -836,19 +872,18 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
- boolean firstCycle = cycleNum == 0;
{
// Initializing build calculator
// Limit scope of these variables to this block
- int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT:
RECORDS_PER_BATCH;
+ int maxBatchSize = spilledState.isFirstCycle()?
RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
boolean doMemoryCalculation = canSpill &&
!probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
- buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash
values bug fixed
+ buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix
after growing hash values bug fixed
buildBatch,
probeBatch,
buildJoinColumns,
@@ -862,13 +897,13 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
- if (firstCycle && doMemoryCalculation) {
+ if (spilledState.isFirstCycle() && doMemoryCalculation) {
// Do auto tuning
buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
}
}
- if (firstCycle) {
+ if (spilledState.isFirstCycle()) {
// Do initial setup only on the first cycle
delayedSetup();
}
@@ -906,11 +941,11 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
}
final int currentRecordCount = buildBatch.getRecordCount();
- if ( cycleNum > 0 ) {
+ if (!spilledState.isFirstCycle()) {
read_right_HV_vector = (IntVector)
buildBatch.getContainer().getLast();
}
//create runtime filter
- if (cycleNum == 0 && enableRuntimeFilter) {
+ if (spilledState.isFirstCycle() && enableRuntimeFilter) {
//create runtime filter and send out async
int condFieldIndex = 0;
for (BloomFilter bloomFilter : bloomFilters) {
@@ -924,10 +959,10 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
// For every record in the build batch, hash the key columns and keep
the result
for (int ind = 0; ind < currentRecordCount; ind++) {
- int hashCode = ( cycleNum == 0 ) ?
partitions[0].getBuildHashCode(ind)
+ int hashCode = spilledState.isFirstCycle() ?
partitions[0].getBuildHashCode(ind)
: read_right_HV_vector.getAccessor().get(ind); // get the hash
value from the HV column
- int currPart = hashCode & partitionMask;
- hashCode >>>= bitsInMask;
+ int currPart = hashCode & spilledState.getPartitionMask();
+ hashCode >>>= spilledState.getBitsInMask();
// Append the new inner row to the appropriate partition; spill
(that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
hashCode, buildCalc); // may spill if needed
}
@@ -942,7 +977,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
}
- if (cycleNum == 0 && enableRuntimeFilter) {
+ if (spilledState.isFirstCycle() && enableRuntimeFilter) {
if (bloomFilters.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
runtimeFilterReporter.sendOut(bloomFilters, probeFields,
this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
@@ -1004,14 +1039,13 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
for (HashPartition partn : partitions) {
if ( partn.isSpilled() ) {
- HJSpilledPartition sp = new HJSpilledPartition();
- sp.innerSpillFile = partn.getSpillFile();
- sp.innerSpilledBatches = partn.getPartitionBatchesCount();
- sp.cycleNum = cycleNum; // remember the current cycle
- sp.origPartn = partn.getPartitionNum(); // for debugging / filename
- sp.prevOrigPartn = originalPartition; // for debugging / filename
- spilledPartitionsList.add(sp);
+ HashJoinSpilledPartition sp = new
HashJoinSpilledPartition(spilledState.getCycle(),
+ partn.getPartitionNum(),
+ originalPartition,
+ partn.getPartitionBatchesCount(),
+ partn.getSpillFile());
+ spilledState.addPartition(sp);
spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
the SP later
partn.closeWriter();
@@ -1169,8 +1203,8 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
}
// delete any spill file left in unread spilled partitions
- while ( ! spilledPartitionsList.isEmpty() ) {
- HJSpilledPartition sp = spilledPartitionsList.remove(0);
+ while (!spilledState.isEmpty()) {
+ HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
try {
spillSet.delete(sp.innerSpillFile);
} catch(IOException e) {
@@ -1210,7 +1244,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
*/
private void updateStats() {
if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the
right side is empty
- if ( cycleNum > 0 ) { return; } // These stats are only for before
processing spilled files
+ if (!spilledState.isFirstCycle()) { return; } // These stats are only for
before processing spilled files
final HashTableStats htStats = new HashTableStats();
long numSpilled = 0;
@@ -1227,7 +1261,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
- this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no
spill
+ this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); //
Put 0 in case no spill
this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
@@ -1257,7 +1291,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public void close() {
- if ( cycleNum > 0 ) { // spilling happened
+ if (!spilledState.isFirstCycle()) { // spilling happened
// In case closing due to cancellation, BaseRootExec.close() does not
close the open
// SpilledRecordBatch "scanners" as it only knows about the original
left/right ops.
killIncoming(false);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 5059b18..beddfa6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,7 @@ public interface HashJoinProbe {
PROBE_PROJECT, PROJECT_RIGHT, DONE
}
- void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing,
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState,
HashPartition[] partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int
numPartitions, int rightHVColPosition);
+ void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing,
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState,
HashPartition[] partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean
buildSideIsEmpty, int numPartitions, int rightHVColPosition);
int probeAndProject() throws SchemaChangeException;
void changeToFinalProbeState();
void setTargetOutputCount(int targetOutputCount);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 71abeda..a63f63d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -77,7 +77,7 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
private int currRightPartition = 0; // for returning RIGHT/FULL
IntVector read_left_HV_vector; // HV vector that was read from the spilled
batch
private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
- private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer
to find the partition
+ private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the
outer to find the partition
private boolean buildSideIsEmpty = true;
private int numPartitions = 1; // must be 2 to the power of bitsInMask
private int partitionMask = 0; // numPartitions - 1
@@ -110,7 +110,7 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
* @param rightHVColPosition
*/
@Override
- public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[]
partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int
numPartitions, int rightHVColPosition) {
+ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[]
partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean
buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
this.container = container;
this.spilledInners = spilledInners;
this.probeBatch = probeBatch;
@@ -253,9 +253,8 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
partn.completeAnOuterBatch(false);
// update the partition's spill record with the outer side
- HashJoinBatch.HJSpilledPartition sp =
spilledInners[partn.getPartitionNum()];
- sp.outerSpillFile = partn.getSpillFile();
- sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+ HashJoinBatch.HashJoinSpilledPartition sp =
spilledInners[partn.getPartitionNum()];
+ sp.updateOuter(partn.getPartitionBatchesCount(),
partn.getSpillFile());
partn.closeWriter();
}