This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 89e0fe6b34259a2f51a7c45070935a2a2400eca4 Author: Ben-Zvi <[email protected]> AuthorDate: Fri Apr 27 21:59:25 2018 -0700 DRILL-6027: - Added fallback option for HashJoin. - No copy of incoming for single partition, and avoid HT resize. - Fix memory leak when cancelling while spill file is read - get correct schema when probe side is empty - Re-create the HashJoinProbe --- .../java/org/apache/drill/exec/ExecConstants.java | 18 +- .../drill/exec/physical/base/AbstractBase.java | 4 +- .../drill/exec/physical/base/PhysicalOperator.java | 4 +- .../drill/exec/physical/config/ExternalSort.java | 4 +- .../drill/exec/physical/config/HashAggregate.java | 11 +- .../drill/exec/physical/config/HashJoinPOP.java | 66 +-- .../drill/exec/physical/impl/BaseRootExec.java | 2 - .../physical/impl/aggregate/HashAggTemplate.java | 2 +- .../exec/physical/impl/common/HashPartition.java | 137 ++++-- .../physical/impl/common/HashTableTemplate.java | 31 +- .../exec/physical/impl/join/HashJoinBatch.java | 468 ++++++--------------- .../impl/join/HashJoinHelperSizeCalculator.java | 2 +- .../join/HashJoinHelperSizeCalculatorImpl.java | 2 +- ...ava => HashJoinMechanicalMemoryCalculator.java} | 5 +- .../impl/join/HashJoinMemoryCalculator.java | 2 +- .../impl/join/HashJoinMemoryCalculatorImpl.java | 3 +- .../exec/physical/impl/join/HashJoinProbe.java | 45 ++ .../physical/impl/join/HashJoinProbeTemplate.java | 425 +++++++++++++++++++ .../exec/physical/impl/join/HashJoinState.java | 2 +- .../impl/join/HashJoinStateCalculator.java | 2 +- .../impl/join/HashTableSizeCalculator.java | 2 +- .../HashTableSizeCalculatorConservativeImpl.java | 2 +- .../impl/join/HashTableSizeCalculatorLeanImpl.java | 2 +- .../impl/protocol/OperatorRecordBatch.java | 5 + .../apache/drill/exec/record/VectorContainer.java | 101 +---- .../exec/server/options/SystemOptionManager.java | 1 + .../drill/exec/util/MemoryAllocationUtilities.java | 2 +- .../work/foreman/rm/ThrottledResourceManager.java | 2 +- .../java-exec/src/main/resources/drill-module.conf | 4 + .../drill/exec/physical/impl/MockRecordBatch.java | 3 + .../physical/impl/common/HashPartitionTest.java | 8 +- .../impl/join/TestBuildSidePartitioningImpl.java | 4 +- .../join/TestHashJoinHelperSizeCalculatorImpl.java | 2 +- .../join/TestHashJoinMemoryCalculatorImpl.java | 2 +- .../exec/physical/impl/join/TestHashJoinSpill.java | 15 +- ...estHashTableSizeCalculatorConservativeImpl.java | 2 +- .../join/TestHashTableSizeCalculatorLeanImpl.java | 2 +- .../exec/physical/impl/join/TestPartitionStat.java | 2 +- .../impl/join/TestPostBuildCalculationsImpl.java | 2 +- .../physical/impl/unnest/MockLateralJoinBatch.java | 2 + .../xsort/managed/TestExternalSortInternals.java | 14 +- .../physical/impl/xsort/managed/TestSorter.java | 1 - 42 files changed, 868 insertions(+), 547 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index c48f414..4510511 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -104,14 +104,14 @@ public final class ExecConstants { public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed"); // Hash Join Options - public static String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type"; - public static StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY); - public static String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor"; - public static DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE); - public static String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor"; - public static DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE); - public static String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor"; - public static DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE); + public static final String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type"; + public static final StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY); + public static final String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor"; + public static final DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE); + public static final String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor"; + public static final DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE); + public static final String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor"; + public static final DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE); public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch"; public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536); public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory"; @@ -122,6 +122,8 @@ public final class ExecConstants { public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0L, Long.MAX_VALUE); public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories"; public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs"; + public static final String HASHJOIN_FALLBACK_ENABLED_KEY = "drill.exec.hashjoin.fallback.enabled"; + public static final BooleanValidator HASHJOIN_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHJOIN_FALLBACK_ENABLED_KEY); // Hash Aggregate Options public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index b93237e..8cf79d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.graph.GraphVisitor; +import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.google.common.base.Preconditions; @@ -114,9 +115,10 @@ public abstract class AbstractBase implements PhysicalOperator { /** * Any operator that supports spilling should override this method (and return true) * @return false + * @param queryContext */ @Override @JsonIgnore - public boolean isBufferedOperator() { return false; } + public boolean isBufferedOperator(QueryContext queryContext) { return false; } @Override public String getUserName() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index d9fcd3a..35138c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphValue; +import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonIdentityInfo; @@ -91,9 +92,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * * @return True iff this operator manages its memory (including disk spilling) + * @param queryContext */ @JsonIgnore - boolean isBufferedOperator(); + boolean isBufferedOperator(QueryContext queryContext); // public void setBufferedOperator(boolean bo); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java index f0e88b4..9ead21c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.config; import java.util.List; import org.apache.drill.common.logical.data.Order.Ordering; +import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; @@ -63,7 +64,8 @@ public class ExternalSort extends Sort { /** * The External Sort operator supports spilling * @return true + * @param queryContext */ @Override - public boolean isBufferedOperator() { return true; } + public boolean isBufferedOperator(QueryContext queryContext) { return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java index f8e6d8e..51f34a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.physical.config; import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -95,8 +97,13 @@ public class HashAggregate extends AbstractSingle { } /** * The Hash Aggregate operator supports spilling - * @return true + * @return true (unless a single partition is forced) + * @param queryContext */ @Override - public boolean isBufferedOperator() { return true; } + public boolean isBufferedOperator(QueryContext queryContext) { + // In case forced to use a single partition - do not consider this a buffered op (when memory is divided) + return queryContext == null || + 1 < (int)queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR) ; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java index b56950a..48d977e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java @@ -17,63 +17,77 @@ */ package org.apache.drill.exec.physical.config; +import java.util.List; + +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.calcite.rel.core.JoinRelType; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.exec.physical.base.AbstractJoinPop; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; - -import java.util.List; @JsonTypeName("hash-join") public class HashJoinPOP extends AbstractJoinPop { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class); + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class); - @JsonCreator - public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, + @JsonCreator + public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, @JsonProperty("conditions") List<JoinCondition> conditions, @JsonProperty("joinType") JoinRelType joinType) { super(left, right, joinType, null, conditions); Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop"); - } + } - @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.size() == 2); HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType); newHashJoin.setMaxAllocation(getMaxAllocation()); return newHashJoin; - } + } - public HashJoinPOP flipIfRight() { - if (joinType == JoinRelType.RIGHT) { + public HashJoinPOP flipIfRight() { + if (joinType == JoinRelType.RIGHT) { List<JoinCondition> flippedConditions = Lists.newArrayList(); for (JoinCondition c : conditions) { flippedConditions.add(c.flip()); } return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT); - } else { + } else { return this; - } - } + } + } - @Override - public int getOperatorType() { + @Override + public int getOperatorType() { return CoreOperatorType.HASH_JOIN_VALUE; } - @Override - public void setMaxAllocation(long maxAllocation) { + /** + * + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { this.maxAllocation = maxAllocation; } - @Override - public boolean isBufferedOperator() { - return true; - } + /** + * The Hash Aggregate operator supports spilling + * @return true (unless a single partition is forced) + * @param queryContext + */ + @Override + public boolean isBufferedOperator(QueryContext queryContext) { + // In case forced to use a single partition - do not consider this a buffered op (when memory is divided) + return queryContext == null || + 1 < (int)queryContext.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR) ; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index b18a78e..e148278 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -32,8 +32,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import com.google.common.base.Supplier; - public abstract class BaseRootExec implements RootExec { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 4f6a117..258e8d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -541,7 +541,7 @@ public abstract class HashAggTemplate implements HashAggregator { } // multiply by the max number of rows in a batch to get the final estimated max size estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE; - // (When there are no aggr functions, use '1' as later code relies on this siisDebze being non-zero) + // (When there are no aggr functions, use '1' as later code relies on this size being non-zero) estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE; estOutgoingAllocSize = estValuesBatchSize; // initially assume same size diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java index 5d0197a..e525530 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.common; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.RetryAfterSpillException; import org.apache.drill.common.exceptions.UserException; @@ -35,6 +34,8 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; @@ -68,7 +69,7 @@ import java.util.concurrent.TimeUnit; public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class); - public static final String HASH_VALUE_COLUMN_NAME = "Hash_Values"; + public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$"; private int partitionNum = -1; // the current number of this partition, as used by the operator @@ -107,30 +108,30 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { private String spillFile; private BufferAllocator allocator; - private int RECORDS_PER_BATCH; - ChainedHashTable baseHashTable; + private int recordsPerBatch; private SpillSet spillSet; private boolean isSpilled; // is this partition spilled ? private boolean processingOuter; // is (inner done spilling and) now the outer is processed? - private boolean outerBatchNotNeeded; // when the inner is whole in memory + private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory private RecordBatch buildBatch; private RecordBatch probeBatch; private int cycleNum; + private int numPartitions; private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList(); private long partitionInMemorySize; private long numInMemoryRecords; public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable, RecordBatch buildBatch, RecordBatch probeBatch, - int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum) { + int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) { this.allocator = allocator; - this.baseHashTable = baseHashTable; this.buildBatch = buildBatch; this.probeBatch = probeBatch; - this.RECORDS_PER_BATCH = recordsPerBatch; + this.recordsPerBatch = recordsPerBatch; this.spillSet = spillSet; this.partitionNum = partNum; this.cycleNum = cycleNum; + this.numPartitions = numPartitions; try { this.hashTable = baseHashTable.createAndSetupHashTable(null); @@ -147,7 +148,9 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { } this.hjHelper = new HashJoinHelper(context, allocator); tmpBatchesList = new ArrayList<>(); - allocateNewCurrentBatchAndHV(); + if ( numPartitions > 1 ) { + allocateNewCurrentBatchAndHV(); + } } /** @@ -173,11 +176,11 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { newVC.add(newVV); // add first to allow dealloc in case of an OOM if (newVV instanceof FixedWidthVector) { - ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH); + ((FixedWidthVector) newVV).allocateNew(recordsPerBatch); } else if (newVV instanceof VariableWidthVector) { - ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH); + ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * recordsPerBatch, recordsPerBatch); } else if (newVV instanceof ObjectVector) { - ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH); + ((ObjectVector) newVV).allocateNew(recordsPerBatch); } else { newVV.allocateNew(); } @@ -197,10 +200,10 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { * Allocate a new current Vector Container and current HV vector */ public void allocateNewCurrentBatchAndHV() { - if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory + if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole in memory currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch); currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator); - currHVVector.allocateNew(RECORDS_PER_BATCH); + currHVVector.allocateNew(recordsPerBatch); } /** @@ -209,8 +212,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) { int pos = currentBatch.appendRow(buildContainer,ind); - currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column - if ( pos + 1 == RECORDS_PER_BATCH ) { + currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column + if ( pos == recordsPerBatch ) { boolean needsSpill = isSpilled || calc.shouldSpill(); completeAnInnerBatch(true, needsSpill); } @@ -222,8 +225,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { */ public void appendOuterRow(int hashCode, int recordsProcessed) { int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed); - currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column - if ( pos + 1 == RECORDS_PER_BATCH ) { + currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column + if ( pos == recordsPerBatch ) { completeAnOuterBatch(true); } } @@ -265,6 +268,42 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { } } + /** + * Append the incoming batch (actually only the vectors of that batch) into the tmp list + */ + public void appendBatch(VectorAccessible batch) { + assert numPartitions == 1; + int recordCount = batch.getRecordCount(); + currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator); + currHVVector.allocateNew(recordCount /* recordsPerBatch */); + try { + // For every record in the build batch, hash the key columns and keep the result + for (int ind = 0; ind < recordCount; ind++) { + int hashCode = getBuildHashCode(ind); + currHVVector.getMutator().set(ind, hashCode); // store the hash value in the new HV column + } + } catch(SchemaChangeException sce) {} + + VectorContainer container = new VectorContainer(); + List<ValueVector> vectors = Lists.newArrayList(); + + for (VectorWrapper<?> v : batch) { + TransferPair tp = v.getValueVector().getTransferPair(allocator); + tp.transfer(); + vectors.add(tp.getTo()); + } + + container.addCollection(vectors); + container.add(currHVVector); // the HV vector is added as an extra "column" + container.setRecordCount(recordCount); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + tmpBatchesList.add(container); + partitionBatchesCount++; + currHVVector = null; + numInMemoryRecords += recordCount; + } + public void spillThisPartition() { if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size()); @@ -300,15 +339,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { v.getValueVector().getMutator().setValueCount(numRecords); } - WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false); + WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false); try { - writer.write(batch, null); + writer.write(wBatch, null); } catch (IOException ioe) { throw UserException.dataWriteError(ioe) .message("Hash Join failed to write to output file: " + spillFile) .build(logger); } finally { - batch.clear(); + wBatch.clear(); } vc.zeroVectors(); logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords); @@ -398,20 +437,9 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { return partitionNum; } - private void freeCurrentBatchAndHVVector() { - if ( currentBatch != null ) { - currentBatch.clear(); - currentBatch = null; - } - if ( currHVVector != null ) { - currHVVector.clear(); - currHVVector = null; - } - } - - public void closeWriterAndDeleteFile() { - closeWriterInternal(true); - } + /** + * Close the writer without deleting the spill file + */ public void closeWriter() { // no deletion !! closeWriterInternal(false); processingOuter = true; // After the spill file was closed @@ -442,7 +470,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { } /** - * Creates the hash table and join helper for this partition. This method should only be called after all the build side records + * Creates the hash table and join helper for this partition. + * This method should only be called after all the build side records * have been consumed. */ public void buildContainersHashTableAndHelper() throws SchemaChangeException { @@ -464,7 +493,6 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { hashTable.updateIncoming(nextBatch, probeBatch ); - // IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector(); IntVector HV_vector = (IntVector) nextBatch.getLast(); for (int recInd = 0; recInd < currentRecordCount; recInd++) { @@ -473,7 +501,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { hashTable.put(recInd, htIndex, hashCode); } catch (RetryAfterSpillException RE) { throw new OutOfMemoryException("HT put"); - } // Hash Join can not retry yet + } // Hash Join does not retry /* Use the global index returned by the hash table, to store * the current record index and batch index. This will be used * later when we probe and find a match. @@ -483,7 +511,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { containers.add(nextBatch); } - outerBatchNotNeeded = true; // the inner is whole in memory, no need for an outer batch + outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch } public void getStats(HashTableStats newStats) { @@ -493,7 +521,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { /** * Frees memory allocated to the {@link HashTable} and {@link HashJoinHelper}. */ - public void clearHashTableAndHelper() { + private void clearHashTableAndHelper() { if (hashTable != null) { hashTable.clear(); hashTable = null; @@ -504,7 +532,22 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { } } - public void close() { + private void freeCurrentBatchAndHVVector() { + if ( currentBatch != null ) { + currentBatch.clear(); + currentBatch = null; + } + if ( currHVVector != null ) { + currHVVector.clear(); + currHVVector = null; + } + } + + /** + * Free all in-memory allocated structures. + * @param deleteFile - whether to delete the spill file or not + */ + public void cleanup(boolean deleteFile) { freeCurrentBatchAndHVVector(); if (containers != null && !containers.isEmpty()) { for (VectorContainer vc : containers) { @@ -515,11 +558,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { VectorContainer vc = tmpBatchesList.remove(0); vc.clear(); } - closeWriter(); - partitionBatchesCount = 0; - spillFile = null; + closeWriterInternal(deleteFile); clearHashTableAndHelper(); - if ( containers != null ) { containers.clear(); } + if ( containers != null ) { + containers.clear(); + } + } + + public void close() { + cleanup(true); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 06e3bcd..bb0b1ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -65,6 +65,8 @@ public abstract class HashTableTemplate implements HashTable { // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries private ArrayList<BatchHolder> batchHolders; + private int totalBatchHoldersSize; // the size of all batchHolders + // Current size of the hash table in terms of number of buckets private int tableSize = 0; @@ -483,6 +485,7 @@ public abstract class HashTableTemplate implements HashTable { // Create the first batch holder batchHolders = new ArrayList<BatchHolder>(); + totalBatchHoldersSize = 0; // First BatchHolder is created when the first put request is received. try { @@ -498,6 +501,7 @@ public abstract class HashTableTemplate implements HashTable { public void updateInitialCapacity(int initialCapacity) { htConfig = htConfig.withInitialCapacity(initialCapacity); allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE); + enlargeEmptyHashTableIfNeeded(initialCapacity); } @Override @@ -543,6 +547,7 @@ public abstract class HashTableTemplate implements HashTable { } batchHolders.clear(); batchHolders = null; + totalBatchHoldersSize = 0; } startIndices.clear(); // currentIdxHolder = null; // keep IndexPointer in case HT is reused @@ -568,6 +573,7 @@ public abstract class HashTableTemplate implements HashTable { if ( batchAdded ) { logger.trace("OOM - Removing index {} from the batch holders list",batchHolders.size() - 1); BatchHolder bh = batchHolders.remove(batchHolders.size() - 1); + totalBatchHoldersSize -= BATCH_SIZE; bh.clear(); } freeIndex--; @@ -677,7 +683,7 @@ public abstract class HashTableTemplate implements HashTable { } htIdxHolder.value = currentIdx; return addedBatch ? PutStatus.NEW_BATCH_ADDED : - ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ? + ( freeIndex + 1 > totalBatchHoldersSize /* batchHolders.size() * BATCH_SIZE */ ) ? PutStatus.KEY_ADDED_LAST : // the last key in the batch PutStatus.KEY_ADDED; // otherwise } @@ -710,9 +716,9 @@ public abstract class HashTableTemplate implements HashTable { // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds // the capacity, we will add a new BatchHolder. Return true if a new batch was added. private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException { - int totalBatchSize = batchHolders.size() * BATCH_SIZE; + // int totalBatchSize = batchHolders.size() * BATCH_SIZE; - if (currentIdx >= totalBatchSize) { + if (currentIdx >= totalBatchHoldersSize) { BatchHolder bh = newBatchHolder(batchHolders.size(), allocationTracker.getNextBatchHolderSize()); batchHolders.add(bh); bh.setup(); @@ -721,6 +727,8 @@ public abstract class HashTableTemplate implements HashTable { } allocationTracker.commit(); + + totalBatchHoldersSize += BATCH_SIZE; // total increased by 1 batch return true; } return false; @@ -797,6 +805,22 @@ public abstract class HashTableTemplate implements HashTable { } /** + * Resize up the Hash Table if needed (to hold newNum entries) + */ + public void enlargeEmptyHashTableIfNeeded(int newNum) { + assert numEntries == 0; + if ( newNum < threshold ) { return; } // no need to resize + + while ( tableSize * 2 < MAXIMUM_CAPACITY && newNum > threshold ) { + tableSize *= 2; + threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor()); + } + startIndices.clear(); + startIndices = allocMetadataVector(tableSize, EMPTY_SLOT); + } + + + /** * Reinit the hash table to its original size, and clear up all its prior batch holder * */ @@ -806,6 +830,7 @@ public abstract class HashTableTemplate implements HashTable { freeIndex = 0; // all batch holders are gone // reallocate batch holders, and the hash table to the original size batchHolders = new ArrayList<BatchHolder>(); + totalBatchHoldersSize = 0; startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT); } public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 0c46e36..ee7a8a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -37,8 +37,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -66,7 +68,24 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; /** + * This class implements the runtime execution for the Hash-Join operator + * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins * + * This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of + * some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}. + * After the build phase is over, in the most general case, some of the partitions were spilled, and the others + * are in memory. Each of the partitions in memory would get a {@link HashTable} built. + * Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in + * memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch. + * But if that build side partition was spilled, then the matching Probe size partition is spilled as well. + * After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is + * processed individually (that Build partition should be smaller than the original, hence likely fit whole into + * memory to allow probing; if not -- see below). + * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact, + * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is + * read and divided into new partitions, which in turn may spill again (and again...). + * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste, + * indicating that the number of partitions chosen was too small. */ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class); @@ -86,6 +105,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Join conditions private final List<JoinCondition> conditions; + + // Runtime generated class implementing HashJoinProbe interface + private HashJoinProbe hashJoinProbe = null; + private final List<NamedExpression> rightExpr; /** @@ -120,6 +143,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Schema of the build side private BatchSchema rightSchema; + // Schema of the probe side + private BatchSchema probeSchema; + private int rightHVColPosition; private BufferAllocator allocator; @@ -131,15 +157,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private SpillSet spillSet; HashJoinPOP popConfig; - private int cycleNum = 0; // primary, secondary, tertiary, etc. + private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc. private int originalPartition = -1; // the partition a secondary reads from - IntVector read_HV_vector; // HV vector that was read from the spilled batch + IntVector read_right_HV_vector; // HV vector that was read from the spilled batch private int maxBatchesInMemory; /** * This holds information about the spilled partitions for the build and probe side. */ - private static class HJSpilledPartition { + public static class HJSpilledPartition { public int innerSpilledBatches; public String innerSpillFile; public int outerSpilledBatches; @@ -189,6 +215,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { setupHashTable(); } setupOutputContainerSchema(); + try { + hashJoinProbe = setupHashJoinProbe(); + } catch (IOException | ClassTransformationException e) { + throw new SchemaChangeException(e); + } + // Build the container schema and set the counts for (final VectorWrapper<?> w : container) { w.getValueVector().allocateNew(); @@ -212,7 +244,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return false; } - if (checkForEarlyFinish()) { + if (checkForEarlyFinish(leftUpstream, rightUpstream)) { state = BatchState.DONE; return false; } @@ -234,11 +266,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { switch (outcome) { case OK_NEW_SCHEMA: - // We need to have the schema of the build side even when the build side is empty - rightSchema = buildBatch.getSchema(); - // position of the new "column" for keeping the hash values (after the real columns) - rightHVColPosition = buildBatch.getContainer().getNumberOfColumns(); - // new schema can also contain records + if ( inputIndex == 0 ) { + // Indicate that a schema was seen (in case probe side is empty) + probeSchema = probeBatch.getSchema(); + } else { + // We need to have the schema of the build side even when the build side is empty + rightSchema = buildBatch.getSchema(); + // position of the new "column" for keeping the hash values (after the real columns) + rightHVColPosition = buildBatch.getContainer().getNumberOfColumns(); + // new schema can also contain records + } case OK: if (recordBatch.getRecordCount() == 0) { continue; @@ -265,12 +302,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType); } else { - return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory); + return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory); } } @Override public IterOutcome innerNext() { + // In case incoming was killed before, just cleanup and return + if ( wasKilled ) { + this.cleanup(); + super.close(); + return IterOutcome.NONE; + } + try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side @@ -280,19 +324,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { executeBuildPhase(); // Update the hash table related stats for the operator updateStats(); - // - setupProbe(); + // Initialize various settings for the probe side + hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition); } - // Store the number of records projected - + // Try to probe and project, or recursively handle a spilled partition if ( ! buildSideIsEmpty || // If there are build-side rows joinType != JoinRelType.INNER) { // or if this is a left/full outer join // Allocate the memory for the vectors in the output container allocateVectors(); - outputRecords = probeAndProject(); + outputRecords = hashJoinProbe.probeAndProject(); /* We are here because of one the following * 1. Completed processing of all the records and we are done @@ -314,13 +357,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Free all partitions' in-memory data structures // (In case need to start processing spilled partitions) for ( HashPartition partn : partitions ) { - partn.close(); + partn.cleanup(false); // clean, but do not delete the spill files !! } // // (recursively) Handle the spilled partitions, if any // - if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) { + if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) { // Get the next (previously) spilled partition to handle as incoming HJSpilledPartition currSp = spilledPartitionsList.remove(0); @@ -337,7 +380,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } else { probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming() leftUpstream = IterOutcome.NONE; - changeToFinalProbeState(); + hashJoinProbe.changeToFinalProbeState(); } // update the cycle num if needed @@ -356,12 +399,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { this.cleanup(); throw UserException .unsupportedError() - .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n" + .message("Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)\n" + "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions) .build(logger); } } - logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size()); + logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches)." + + " More {} spilled partitions left.", + currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, + currSp.innerSpilledBatches, spilledPartitionsList.size()); state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this @@ -400,12 +446,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private void setupHashTable() throws SchemaChangeException { final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size()); - // When DRILL supports Java 8, use the following instead of the for() loop - // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); - for (int i=0; i<conditions.size(); i++) { - JoinCondition cond = conditions.get(i); - comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)); - } + conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); // Setup the hash table configuration object List<NamedExpression> leftExpr = new ArrayList<>(conditions.size()); @@ -441,16 +482,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // // Find out the estimated max batch size, etc // and compute the max numPartitions possible + // See partitionNumTuning() // - // numPartitions = 8; // just for initial work; change later - // partitionMask = 7; - // bitsInMask = 3; - - // SET FROM CONFIGURATION OPTIONS : - // ================================ - // Set the number of partitions from the configuration (raise to a power of two, if needed) - // Based on the number of partitions: Set the mask and bit count partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 @@ -471,7 +505,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Recreate the partitions every time build is initialized for (int part = 0; part < numPartitions; part++ ) { partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, - RECORDS_PER_BATCH, spillSet, part, cycleNum); + RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions); } spilledInners = new HJSpilledPartition[numPartitions]; @@ -526,15 +560,38 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { TARGET_RECORDS_PER_BATCH, HashTable.DEFAULT_LOAD_FACTOR); - numPartitions = 1; // We are only using one partition - canSpill = false; // We cannot spill - allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory + disableSpilling(null); } return buildCalc; } /** + * Disable spilling - use only a single partition and set the memory limit to the max ( 10GB ) + * @param reason If not null - log this as warning, else check fallback setting to either warn or fail. + */ + private void disableSpilling(String reason) { + // Fail, or just issue a warning if a reason was given, or a fallback option is enabled + if ( reason == null ) { + final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val; + if (fallbackEnabled) { + logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" + + " to use unbounded memory"); + } else { + throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " + + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " + + "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger); + } + } else { + logger.warn(reason); + } + + numPartitions = 1; // We are only using one partition + canSpill = false; // We cannot spill + allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory + } + + /** * Execute the BUILD phase; first read incoming and split rows into partitions; * may decide to spill some of the partitions * @@ -560,7 +617,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { calc.initialize(doMemoryCalculation); buildCalc = calc.next(); - // We've sniffed first non empty build and probe batches so we have enough information to createa calculator + // We've sniffed first non empty build and probe batches so we have enough information to create a calculator buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed buildBatch, probeBatch, @@ -608,25 +665,30 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { for (HashPartition partn : partitions) { partn.updateBatches(); } // Fall through case OK: + // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy) + if ( numPartitions == 1 ) { + partitions[0].appendBatch(buildBatch); + break; + } final int currentRecordCount = buildBatch.getRecordCount(); if ( cycleNum > 0 ) { - read_HV_vector = (IntVector) buildBatch.getContainer().getLast(); + read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); } // For every record in the build batch, hash the key columns and keep the result for (int ind = 0; ind < currentRecordCount; ind++) { int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind) - : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column + : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column int currPart = hashCode & partitionMask ; hashCode >>>= bitsInMask; // Append the new inner row to the appropriate partition; spill (that partition) if needed partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed } - if ( read_HV_vector != null ) { - read_HV_vector.clear(); - read_HV_vector = null; + if ( read_right_HV_vector != null ) { + read_right_HV_vector.clear(); + read_right_HV_vector = null; } break; } @@ -637,8 +699,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Move the remaining current batches into their temp lists, or spill // them if the partition is spilled. Add the spilled partitions into // the spilled partitions list - for (HashPartition partn : partitions) { - partn.completeAnInnerBatch(false, partn.isSpilled() ); + if ( numPartitions > 1 ) { // a single partition needs no completion + for (HashPartition partn : partitions) { + partn.completeAnInnerBatch(false, partn.isSpilled()); + } } HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next(); @@ -715,7 +779,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } } - if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { + if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows) for (final VectorWrapper<?> vv : probeBatch) { final MajorType inputType = vv.getField().getType(); final MajorType outputType; @@ -761,6 +825,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return spilledInners[part] != null; } + /** + * The constructor + * + * @param popConfig + * @param context + * @param left -- probe/outer side incoming input + * @param right -- build/iner side incoming input + * @throws OutOfMemoryException + */ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, /*Probe side record batch*/ RecordBatch right /*Build side record batch*/ @@ -783,16 +856,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName))); } + this.allocator = oContext.getAllocator(); + numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); if ( numPartitions == 1 ) { // - canSpill = false; - logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1"); + disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1"); } numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 - this.allocator = oContext.getAllocator(); - final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR); if (memLimit != 0) { @@ -802,6 +874,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR); maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR); + logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit())); spillSet = new SpillSet(context, popConfig); // Create empty partitions (in the ctor - covers the case where right side is empty) @@ -818,10 +891,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); } - // clean (and deallocate) each partition + // clean (and deallocate) each partition, and delete its spill file for (HashPartition partn : partitions) { - partn.clearHashTableAndHelper(); - partn.closeWriterAndDeleteFile(); + partn.close(); } // delete any spill file left in unread spilled partitions @@ -896,288 +968,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { @Override public void close() { - for ( HashPartition partn : partitions ) { - partn.close(); + if ( cycleNum > 0 ) { // spilling happened + // In case closing due to cancellation, BaseRootExec.close() does not close the open + // SpilledRecordBatch "scanners" as it only knows about the original left/right ops. + killIncoming(false); } - cleanup(); + this.cleanup(); super.close(); } - // ============================================================== - // - // Methods used for the probe - // - // ============================================================ - private BatchSchema probeSchema; - - enum ProbeState { - PROBE_PROJECT, PROJECT_RIGHT, DONE - } - - private int currRightPartition = 0; // for returning RIGHT/FULL - - // Number of records to process on the probe side - private int recordsToProcess = 0; - - // Number of records processed on the probe side - private int recordsProcessed = 0; - - // Indicate if we should drain the next record from the probe side - private boolean getNextRecord = true; - - // Contains both batch idx and record idx of the matching record in the build side - private int currentCompositeIdx = -1; - - // Current state the hash join algorithm is in - private ProbeState probeState = ProbeState.PROBE_PROJECT; - - // For outer or right joins, this is a list of unmatched records that needs to be projected - private List<Integer> unmatchedBuildIndexes = null; + public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { + final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); + cg.plainJavaCapable(true); + // cg.saveCodeForDebugging(true); - // While probing duplicates, retain current build-side partition in case need to continue - // probing later on the same chain of duplicates - private HashPartition currPartition; + // No real code generation !! - /** - * Various initialization needed to perform the probe - * Must be called AFTER the build completes - */ - private void setupProbe() { - currRightPartition = 0; // In case it's a Right/Full outer join - recordsProcessed = 0; - recordsToProcess = 0; - - probeSchema = probeBatch.getSchema(); - probeState = ProbeState.PROBE_PROJECT; - - // A special case - if the left was an empty file - if ( leftUpstream == IterOutcome.NONE ){ - changeToFinalProbeState(); - } else { - this.recordsToProcess = probeBatch.getRecordCount(); - } - - // for those outer partitions that need spilling (cause their matching inners spilled) - // initialize those partitions' current batches and hash-value vectors - for ( HashPartition partn : partitions ) { - partn.allocateNewCurrentBatchAndHV(); - } - - if ( cycleNum > 0 ) { - if ( read_HV_vector != null ) { read_HV_vector.clear();} - if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty - read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); - } - } + final HashJoinProbe hj = context.getImplementationClass(cg); + return hj; } - private void executeProjectRightPhase(int currBuildPart) { - while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { - outputRecords = - container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed), - null /* no probeBatch */, 0 /* no probe index */ ); - recordsProcessed++; - } - } - - private void executeProbePhase() throws SchemaChangeException { - - while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { - - // Check if we have processed all records in this batch we need to invoke next - if (recordsProcessed == recordsToProcess) { - - // Done processing all records in the previous batch, clean up! - for (VectorWrapper<?> wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - - IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - - switch (leftUpstream) { - case NONE: - case NOT_YET: - case STOP: - recordsProcessed = 0; - recordsToProcess = 0; - changeToFinalProbeState(); - // in case some outer partitions were spilled, need to spill their last batches - for ( HashPartition partn : partitions ) { - if ( ! partn.isSpilled() ) { continue; } // skip non-spilled - partn.completeAnOuterBatch(false); - // update the partition's spill record with the outer side - HJSpilledPartition sp = spilledInners[partn.getPartitionNum()]; - sp.outerSpillFile = partn.getSpillFile(); - sp.outerSpilledBatches = partn.getPartitionBatchesCount(); - - partn.closeWriter(); - } - - continue; - - case OK_NEW_SCHEMA: - if (probeBatch.getSchema().equals(probeSchema)) { - for ( HashPartition partn : partitions ) { partn.updateBatches(); } - - } else { - throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", - probeSchema, - probeBatch.getSchema()); - } - case OK: - recordsToProcess = probeBatch.getRecordCount(); - recordsProcessed = 0; - // If we received an empty batch do nothing - if (recordsToProcess == 0) { - continue; - } - if ( cycleNum > 0 ) { - read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ? - } - } - } - int probeIndex = -1; - // Check if we need to drain the next row in the probe side - if (getNextRecord) { - - if ( !buildSideIsEmpty ) { - int hashCode = ( cycleNum == 0 ) ? - partitions[0].getProbeHashCode(recordsProcessed) - : read_HV_vector.getAccessor().get(recordsProcessed); - int currBuildPart = hashCode & partitionMask ; - hashCode >>>= bitsInMask; - - // Set and keep the current partition (may be used again on subsequent probe calls as - // inner rows of duplicate key are processed) - currPartition = partitions[currBuildPart]; // inner if not spilled, else outer - - // If the matching inner partition was spilled - if ( isSpilledInner(currBuildPart) ) { - // add this row to its outer partition (may cause a spill, when the batch is full) - - currPartition.appendOuterRow(hashCode, recordsProcessed); - - recordsProcessed++; // done with this outer record - continue; // on to the next outer record - } - - probeIndex = currPartition.probeForKey(recordsProcessed, hashCode); - - } - - if (probeIndex != -1) { - - /* The current probe record has a key that matches. Get the index - * of the first row in the build side that matches the current key - * (and record this match in the bitmap, in case of a FULL/RIGHT join) - */ - currentCompositeIdx = currPartition.getStartIndex(probeIndex); - - outputRecords = - container.appendRow(currPartition.getContainers(), currentCompositeIdx, - probeBatch.getContainer(), recordsProcessed); - - /* Projected single row from the build side with matching key but there - * may be more rows with the same key. Check if that's the case - */ - currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); - if (currentCompositeIdx == -1) { - /* We only had one row in the build side that matched the current key - * from the probe side. Drain the next row in the probe side. - */ - recordsProcessed++; - } else { - /* There is more than one row with the same key on the build side - * don't drain more records from the probe side till we have projected - * all the rows with this key - */ - getNextRecord = false; - } - } else { // No matching key - - // If we have a left outer join, project the outer side - if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { - - outputRecords = - container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition); - } - recordsProcessed++; - } - } - else { // match the next inner row with the same key - - currPartition.setRecordMatched(currentCompositeIdx); - - outputRecords = - container.appendRow(currPartition.getContainers(), currentCompositeIdx, - probeBatch.getContainer(), recordsProcessed); - - currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); - - if (currentCompositeIdx == -1) { - // We don't have any more rows matching the current key on the build side, move on to the next probe row - getNextRecord = true; - recordsProcessed++; - } - } - } - } - - /** - * Perform the probe and project the results - * - * @return number of output records - * @throws SchemaChangeException - */ - private int probeAndProject() throws SchemaChangeException { - - outputRecords = 0; - - // When handling spilled partitions, the state becomes DONE at the end of each partition - if ( probeState == ProbeState.DONE ) { - return outputRecords; // that is zero - } - - if (probeState == ProbeState.PROBE_PROJECT) { - executeProbePhase(); - } - - if (probeState == ProbeState.PROJECT_RIGHT) { - // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join - - do { - - if (unmatchedBuildIndexes == null) { // first time for this partition ? - if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right - // Get this partition's list of build indexes that didn't match any record on the probe side - unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex(); - recordsProcessed = 0; - recordsToProcess = unmatchedBuildIndexes.size(); - } - - // Project the list of unmatched records on the build side - executeProjectRightPhase(currRightPartition); - - if ( recordsProcessed < recordsToProcess ) { // more records in this partition? - return outputRecords; // outgoing is full; report and come back later - } else { - currRightPartition++; // on to the next right partition - unmatchedBuildIndexes = null; - } - - } while ( currRightPartition < numPartitions ); - - probeState = ProbeState.DONE; // last right partition was handled; we are done now - } - - return outputRecords; - } - - private void changeToFinalProbeState() { - // We are done with the (left) probe phase. - // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side - probeState = - (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT : - ProbeState.DONE; // else we're done - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java index f5c826a..6ddd05e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java index 728fde5..a17ea2f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java similarity index 97% rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java index 8b367dd..618e80e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.drill.exec.physical.impl.join; import com.google.common.base.Preconditions; @@ -24,12 +23,12 @@ import org.apache.drill.exec.record.RecordBatch; import javax.annotation.Nullable; import java.util.Set; -public class MechanicalHashJoinMemoryCalculator implements HashJoinMemoryCalculator { +public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalculator { private final int maxNumInMemBatches; private boolean doMemoryCalc; - public MechanicalHashJoinMemoryCalculator(int maxNumInMemBatches) { + public HashJoinMechanicalMemoryCalculator(int maxNumInMemBatches) { this.maxNumInMemBatches = maxNumInMemBatches; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java index f2de0fe..71292a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java index 5890a42..ed0adc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.join; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.commons.io.FileUtils; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchSizer; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java new file mode 100644 index 0000000..f212605 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.impl.common.HashPartition; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.record.VectorContainer; + +public interface HashJoinProbe { + TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); + + /* The probe side of the hash join can be in the following two states + * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a + * key match and if we do we project the record + * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records + * from the build side that did not match any records on the probe side. For Left outer + * case we handle it internally by projecting the record if there isn't a match on the build side + * 3. DONE: Once we have projected all possible records we are done + */ + enum ProbeState { + PROBE_PROJECT, PROJECT_RIGHT, DONE + } + + void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition); + int probeAndProject() throws SchemaChangeException; + void changeToFinalProbeState(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java new file mode 100644 index 0000000..75c3073 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.impl.common.HashPartition; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class HashJoinProbeTemplate implements HashJoinProbe { + + VectorContainer container; // the outgoing container + + // Probe side record batch + private RecordBatch probeBatch; + + private BatchSchema probeSchema; + + // Join type, INNER, LEFT, RIGHT or OUTER + private JoinRelType joinType; + + private HashJoinBatch outgoingJoinBatch = null; + + private static final int TARGET_RECORDS_PER_BATCH = 4000; + + // Number of records to process on the probe side + private int recordsToProcess = 0; + + // Number of records processed on the probe side + private int recordsProcessed = 0; + + // Number of records in the output container + private int outputRecords; + + // Indicate if we should drain the next record from the probe side + private boolean getNextRecord = true; + + // Contains both batch idx and record idx of the matching record in the build side + private int currentCompositeIdx = -1; + + // Current state the hash join algorithm is in + private ProbeState probeState = ProbeState.PROBE_PROJECT; + + // For outer or right joins, this is a list of unmatched records that needs to be projected + private List<Integer> unmatchedBuildIndexes = null; + + private HashPartition partitions[]; + + // While probing duplicates, retain current build-side partition in case need to continue + // probing later on the same chain of duplicates + private HashPartition currPartition; + + private int currRightPartition = 0; // for returning RIGHT/FULL + IntVector read_left_HV_vector; // HV vector that was read from the spilled batch + private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc. + private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer to find the partition + private boolean buildSideIsEmpty = true; + private int numPartitions = 1; // must be 2 to the power of bitsInMask + private int partitionMask = 0; // numPartitions - 1 + private int bitsInMask = 0; // number of bits in the MASK + private int rightHVColPosition; + + /** + * Setup the Hash Join Probe object + * + * @param probeBatch + * @param outgoing + * @param joinRelType + * @param leftStartState + * @param partitions + * @param cycleNum + * @param container + * @param spilledInners + * @param buildSideIsEmpty + * @param numPartitions + * @param rightHVColPosition + */ + @Override + public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) { + this.container = container; + this.spilledInners = spilledInners; + this.probeBatch = probeBatch; + this.probeSchema = probeBatch.getSchema(); + this.joinType = joinRelType; + this.outgoingJoinBatch = outgoing; + this.partitions = partitions; + this.cycleNum = cycleNum; + this.buildSideIsEmpty = buildSideIsEmpty; + this.numPartitions = numPartitions; + this.rightHVColPosition = rightHVColPosition; + + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + probeState = ProbeState.PROBE_PROJECT; + this.recordsToProcess = 0; + this.recordsProcessed = 0; + + // A special case - if the left was an empty file + if ( leftStartState == IterOutcome.NONE ){ + changeToFinalProbeState(); + } else { + this.recordsToProcess = probeBatch.getRecordCount(); + } + + // for those outer partitions that need spilling (cause their matching inners spilled) + // initialize those partitions' current batches and hash-value vectors + for ( HashPartition partn : this.partitions) { + partn.allocateNewCurrentBatchAndHV(); + } + + currRightPartition = 0; // In case it's a Right/Full outer join + + // Initialize the HV vector for the first (already read) left batch + if ( this.cycleNum > 0 ) { + if ( read_left_HV_vector != null ) { read_left_HV_vector.clear();} + if ( leftStartState != IterOutcome.NONE ) { // Skip when outer spill was empty + read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); + } + } + } + + /** + * Append the given build side row into the outgoing container + * @param buildSrcContainer The container for the right/inner side + * @param buildSrcIndex build side index + * @return The index for the last column (where the probe side would continue copying) + */ + private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) { + // "- 1" to skip the last "hash values" added column + int lastColIndex = buildSrcContainer.getNumberOfColumns() - 1 ; + for (int vectorIndex = 0; vectorIndex < lastColIndex; vectorIndex++) { + ValueVector destVector = container.getValueVector(vectorIndex).getValueVector(); + ValueVector srcVector = buildSrcContainer.getValueVector(vectorIndex).getValueVector(); + destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex); + } + return lastColIndex; + } + /** + * Append the given probe side row into the outgoing container, following the build side part + * @param probeSrcContainer The container for the left/outer side + * @param probeSrcIndex probe side index + * @param baseIndex The column index to start copying into (following the build columns) + */ + private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) { + for (int vectorIndex = baseIndex; vectorIndex < container.getNumberOfColumns(); vectorIndex++) { + ValueVector destVector = container.getValueVector(vectorIndex).getValueVector(); + ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - baseIndex).getValueVector(); + destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex); + } + } + /** + * A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it + * copies the build and probe sides into the outgoing container. (It uses a composite + * index for the build side) + * @param buildSrcContainers The containers list for the right/inner side + * @param compositeBuildSrcIndex Composite build index + * @param probeSrcContainer The single container for the left/outer side + * @param probeSrcIndex Index in the outer container + * @return Number of rows in this container (after the append) + */ + private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex, + VectorContainer probeSrcContainer, int probeSrcIndex) { + int buildBatchIndex = compositeBuildSrcIndex >>> 16; + int buildOffset = compositeBuildSrcIndex & 65535; + int baseInd = 0; + if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset); } + if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); } + return container.incRecordCount(); + } + + /** + * A customised version of the VectorContainer's appendRow for HashJoin - used for Left + * Outer Join when there is no build side match - hence need a base index in + * this container's wrappers from where to start appending + * @param probeSrcContainer + * @param probeSrcIndex + * @param baseInd - index of this container's wrapper to start at + * @return + */ + private int outputOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) { + appendProbe(probeSrcContainer, probeSrcIndex, baseInd); + return container.incRecordCount(); + } + + + private void executeProjectRightPhase(int currBuildPart) { + while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { + outputRecords = + outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed), + null /* no probeBatch */, 0 /* no probe index */ ); + recordsProcessed++; + } + } + + private void executeProbePhase() throws SchemaChangeException { + + while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { + + // Check if we have processed all records in this batch we need to invoke next + if (recordsProcessed == recordsToProcess) { + + // Done processing all records in the previous batch, clean up! + for (VectorWrapper<?> wrapper : probeBatch) { + wrapper.getValueVector().clear(); + } + + IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch); + + switch (leftUpstream) { + case NONE: + case NOT_YET: + case STOP: + recordsProcessed = 0; + recordsToProcess = 0; + changeToFinalProbeState(); + // in case some outer partitions were spilled, need to spill their last batches + for ( HashPartition partn : partitions ) { + if ( ! partn.isSpilled() ) { continue; } // skip non-spilled + partn.completeAnOuterBatch(false); + // update the partition's spill record with the outer side + HashJoinBatch.HJSpilledPartition sp = spilledInners[partn.getPartitionNum()]; + sp.outerSpillFile = partn.getSpillFile(); + sp.outerSpilledBatches = partn.getPartitionBatchesCount(); + + partn.closeWriter(); + } + + continue; + + case OK_NEW_SCHEMA: + if (probeBatch.getSchema().equals(probeSchema)) { + for ( HashPartition partn : partitions ) { partn.updateBatches(); } + + } else { + throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", + probeSchema, + probeBatch.getSchema()); + } + case OK: + recordsToProcess = probeBatch.getRecordCount(); + recordsProcessed = 0; + // If we received an empty batch do nothing + if (recordsToProcess == 0) { + continue; + } + if ( cycleNum > 0 ) { + read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ? + } + } + } + + int probeIndex = -1; + // Check if we need to drain the next row in the probe side + if (getNextRecord) { + + if ( !buildSideIsEmpty ) { + int hashCode = ( cycleNum == 0 ) ? + partitions[0].getProbeHashCode(recordsProcessed) + : read_left_HV_vector.getAccessor().get(recordsProcessed); + int currBuildPart = hashCode & partitionMask ; + hashCode >>>= bitsInMask; + + // Set and keep the current partition (may be used again on subsequent probe calls as + // inner rows of duplicate key are processed) + currPartition = partitions[currBuildPart]; // inner if not spilled, else outer + + // If the matching inner partition was spilled + if ( outgoingJoinBatch.isSpilledInner(currBuildPart) ) { + // add this row to its outer partition (may cause a spill, when the batch is full) + + currPartition.appendOuterRow(hashCode, recordsProcessed); + + recordsProcessed++; // done with this outer record + continue; // on to the next outer record + } + + probeIndex = currPartition.probeForKey(recordsProcessed, hashCode); + + } + + if (probeIndex != -1) { + + /* The current probe record has a key that matches. Get the index + * of the first row in the build side that matches the current key + * (and record this match in the bitmap, in case of a FULL/RIGHT join) + */ + currentCompositeIdx = currPartition.getStartIndex(probeIndex); + + outputRecords = + outputRow(currPartition.getContainers(), currentCompositeIdx, + probeBatch.getContainer(), recordsProcessed); + + /* Projected single row from the build side with matching key but there + * may be more rows with the same key. Check if that's the case + */ + currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); + if (currentCompositeIdx == -1) { + /* We only had one row in the build side that matched the current key + * from the probe side. Drain the next row in the probe side. + */ + recordsProcessed++; + } else { + /* There is more than one row with the same key on the build side + * don't drain more records from the probe side till we have projected + * all the rows with this key + */ + getNextRecord = false; + } + } else { // No matching key + + // If we have a left outer join, project the outer side + if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { + + outputRecords = + outputOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition); + } + recordsProcessed++; + } + } + else { // match the next inner row with the same key + + currPartition.setRecordMatched(currentCompositeIdx); + + outputRecords = + outputRow(currPartition.getContainers(), currentCompositeIdx, + probeBatch.getContainer(), recordsProcessed); + + currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); + + if (currentCompositeIdx == -1) { + // We don't have any more rows matching the current key on the build side, move on to the next probe row + getNextRecord = true; + recordsProcessed++; + } + } + } + + } + + /** + * Perform the probe, till the outgoing is full, or no more rows to probe. + * Performs the inner or left-outer join while there are left rows, + * when done, continue with right-outer, if appropriate. + * @return Num of output records + * @throws SchemaChangeException + */ + @Override + public int probeAndProject() throws SchemaChangeException { + + outputRecords = 0; + + // When handling spilled partitions, the state becomes DONE at the end of each partition + if ( probeState == ProbeState.DONE ) { + return outputRecords; // that is zero + } + + if (probeState == ProbeState.PROBE_PROJECT) { + executeProbePhase(); + } + + if (probeState == ProbeState.PROJECT_RIGHT) { + // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join + + do { + + if (unmatchedBuildIndexes == null) { // first time for this partition ? + if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right + // Get this partition's list of build indexes that didn't match any record on the probe side + unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex(); + recordsProcessed = 0; + recordsToProcess = unmatchedBuildIndexes.size(); + } + + // Project the list of unmatched records on the build side + executeProjectRightPhase(currRightPartition); + + if ( recordsProcessed < recordsToProcess ) { // more records in this partition? + return outputRecords; // outgoing is full; report and come back later + } else { + currRightPartition++; // on to the next right partition + unmatchedBuildIndexes = null; + } + + } while ( currRightPartition < numPartitions ); + + probeState = ProbeState.DONE; // last right partition was handled; we are done now + } + + return outputRecords; + } + + @Override + public void changeToFinalProbeState() { + // We are done with the (left) probe phase. + // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side + probeState = + (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT : + ProbeState.DONE; // else we're done + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java index 33f22be..7fe9b5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java index 695a68c..4036438 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java index 04d15ff..0bc0a7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java index 05354d5..8575021 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java index 87d8d1b..4f9e585 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java index 4f0cff8..620f150 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java @@ -153,4 +153,9 @@ public class OperatorRecordBatch implements CloseableRecordBatch { public void close() { driver.close(); } + + @Override + public VectorContainer getContainer() { + return batchAccessor.getOutgoingContainer(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index d966f50..e35bb5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -218,7 +217,7 @@ public class VectorContainer implements VectorAccessible { * Appends a row taken from a source {@link VectorContainer} to this {@link VectorContainer}. * @param srcContainer The {@link VectorContainer} to copy a row from. * @param srcIndex The index of the row to copy from the source {@link VectorContainer}. - * @return Position where the row was appended + * @return Position one above where the row was appended */ public int appendRow(VectorContainer srcContainer, int srcIndex) { for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) { @@ -226,95 +225,8 @@ public class VectorContainer implements VectorAccessible { ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector(); destVector.copyEntry(recordCount, srcVector, srcIndex); } - int pos = recordCount++; - initialized = true; - return pos; - } - - /** - * This method currently is only used by the Hash Join to return a row composed of build+probe rows - * - * This works with non-hyper {@link VectorContainer}s which have no selection vectors. - * Appends a row taken from two source {@link VectorContainer}s to this {@link VectorContainer}. - * @param buildSrcContainer The {@link VectorContainer} to copy the first columns of a row from. - * @param buildSrcIndex The index of the row to copy from the build side source {@link VectorContainer}. - * @param probeSrcContainer The {@link VectorContainer} to copy the last columns of a row from. - * @param probeSrcIndex The index of the row to copy from the probe side source {@link VectorContainer}. - * @return Number of records in the container after appending - */ - public int appendRowXXX(VectorContainer buildSrcContainer, int buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) { - if ( buildSrcContainer != null ) { - for (int vectorIndex = 0; vectorIndex < buildSrcContainer.wrappers.size(); vectorIndex++) { - ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); - ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector(); - destVector.copyEntry(recordCount, srcVector, buildSrcIndex); - } - } - if ( probeSrcContainer != null ) { - int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size(); - for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) { - ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); - ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex).getValueVector(); - destVector.copyEntry(recordCount, srcVector, probeSrcIndex); - } + return incRecordCount(); } - recordCount++; - initialized = true; - return recordCount; - } - - private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) { - // "- 1" to skip the last "hash values" added column - int lastIndex = buildSrcContainer.wrappers.size() - 1 ; - for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) { - ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); - ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector(); - destVector.copyEntry(recordCount, srcVector, buildSrcIndex); - } - return lastIndex; - } - private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) { - // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size(); - for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) { - ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); - ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - baseIndex).getValueVector(); - destVector.copyEntry(recordCount, srcVector, probeSrcIndex); - } - } - /** - * A special version of appendRow for the HashJoin; uses a composite index for the build side - * @param buildSrcContainers The containers list for the right side - * @param compositeBuildSrcIndex Composite build index - * @param probeSrcContainer The single container for the left/outer side - * @param probeSrcIndex Index in the outer container - * @return Number of rows in this container (after the append) - */ - public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) { - int buildBatch = compositeBuildSrcIndex >>> 16; - int buildOffset = compositeBuildSrcIndex & 65535; - int baseInd = 0; - if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatch), buildOffset); } - if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); } - recordCount++; - initialized = true; - return recordCount; - } - - /** - * A customised version of the special appendRow for HashJoin - used for Left - * Outer Join when there is no build side match - hence need a base index in - * this container's wrappers from where to start appending - * @param probeSrcContainer - * @param probeSrcIndex - * @param baseInd - index of this container's wrapper to start at - * @return - */ - public int appendOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) { - appendProbe(probeSrcContainer, probeSrcIndex, baseInd); - recordCount++; - initialized = true; - return recordCount; - } public TypedFieldId add(ValueVector vv) { schemaChanged = true; @@ -459,6 +371,15 @@ public class VectorContainer implements VectorAccessible { initialized = true; } + /** + * Increment the record count + * @return the new record count + */ + public int incRecordCount() { + initialized = true; + return ++recordCount; + } + @Override public int getRecordCount() { Preconditions.checkState(hasRecordCount(), "Record count not set for this vector container"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 183c1f1..663def1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -121,6 +121,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR), new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), + new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR), new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR), new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java index 3189535..20ef79d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java @@ -68,7 +68,7 @@ public class MemoryAllocationUtilities { // look for external sorts final List<PhysicalOperator> bufferedOpList = new LinkedList<>(); for (final PhysicalOperator op : plan.getSortedOperators()) { - if (op.isBufferedOperator()) { + if (op.isBufferedOperator(queryContext)) { bufferedOpList.add(op); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java index bbbd88c..9c2a2d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java @@ -229,7 +229,7 @@ public class ThrottledResourceManager extends AbstractResourceManager { @Override public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) throws RuntimeException { - if (op.isBufferedOperator()) { + if (op.isBufferedOperator(null)) { value.add(op); } visitChildren(op, value); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b4969c0..7d92570 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -421,6 +421,10 @@ drill.exec.options: { drill.exec.functions.cast_empty_string_to_null: false, drill.exec.rpc.fragrunner.timeout: 10000, drill.exec.hashjoin.mem_limit: 0, + # Setting to control if HashJoin should fallback to older behavior of consuming + # unbounded memory. By default it's set to false such that the + # query will fail if there is not enough memory + drill.exec.hashjoin.fallback.enabled: true, # should soon be changed to false !! # Setting to control if HashAgg should fallback to older behavior of consuming # unbounded memory. In case of 2 phase Agg when available memory is not enough # to start at least 2 partitions then HashAgg fallbacks to this case. It can be diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java index 5463974..0c43ab2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java @@ -182,6 +182,9 @@ public class MockRecordBatch implements CloseableRecordBatch { return container.iterator(); } + @Override + public VectorContainer getContainer() { return container; } + public boolean isCompleted() { return isDone; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java index b57329c..6d06434 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java @@ -43,8 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.test.BaseDirTestWatcher; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.DirectRowSet; @@ -111,7 +109,8 @@ public class HashPartitionTest { 10, spillSet, 0, - 0); + 0, + 2); // only '1' has a special treatment final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl(); @@ -206,7 +205,8 @@ public class HashPartitionTest { 10, spillSet, 0, - 0); + 0, + 2); final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java index dc05a70..30c0c73 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java index bd34df4..ed25d78 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java index 4944c87..4fe1fa4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java index 37a6a33..52e6707 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java @@ -6,18 +6,18 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.drill.exec.physical.impl.join; +import ch.qos.logback.classic.Level; import com.google.common.collect.Lists; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.categories.OperatorTest; @@ -25,7 +25,7 @@ import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase; -import org.junit.Ignore; +import org.apache.drill.test.LogFixture; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -33,10 +33,16 @@ import java.util.List; @Category({SlowTest.class, OperatorTest.class}) public class TestHashJoinSpill extends PhysicalOpUnitTestBase { + @SuppressWarnings("unchecked") @Test // Should spill, including recursive spill public void testSimpleHashJoinSpill() { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill", Level.WARN); + + HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER); operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4); @@ -53,6 +59,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase { rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]"); } + LogFixture logs = logBuilder.build(); opTestBuilder() .physicalOperator(joinConf) .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable)) diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java index 4e9f1c7..3f01bca 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java index 5cdf524..1bd51fc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java index 40eec6a..627d737 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java index 1e9bb8a..5cf7eca 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java index 0bf995a..36d004c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java @@ -230,6 +230,8 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat return count; } + @Override + public VectorContainer getContainer() { return null; } @Override public Iterator<VectorWrapper<?>> iterator() { return null; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index b8f9563..9c2d5d8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -27,8 +27,10 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask; +import org.apache.drill.test.BaseDirTestWatcher; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.SubOperatorTest; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,6 +38,8 @@ import org.junit.experimental.categories.Category; public class TestExternalSortInternals extends SubOperatorTest { private static final int ONE_MEG = 1024 * 1024; + @Rule + public final BaseDirTestWatcher watcher = new BaseDirTestWatcher(); /** * Verify defaults configured in drill-override.conf. @@ -66,7 +70,7 @@ public class TestExternalSortInternals extends SubOperatorTest { @Test public void testConfigOverride() { // Verify the various HOCON ways of setting memory - OperatorFixture.Builder builder = new OperatorFixture.Builder(); + OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher); builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K") .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10) @@ -92,7 +96,7 @@ public class TestExternalSortInternals extends SubOperatorTest { */ @Test public void testConfigLimits() { - OperatorFixture.Builder builder = new OperatorFixture.Builder(); + OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher); builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1) .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE - 1) @@ -414,7 +418,7 @@ public class TestExternalSortInternals extends SubOperatorTest { int batchSizeConstraint = ONE_MEG / 2; int mergeSizeConstraint = ONE_MEG; - OperatorFixture.Builder builder = new OperatorFixture.Builder(); + OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher); builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstraint) .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstraint) @@ -470,7 +474,7 @@ public class TestExternalSortInternals extends SubOperatorTest { // No artificial merge limit int mergeLimitConstraint = 100; - OperatorFixture.Builder builder = new OperatorFixture.Builder(); + OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher); builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint) .build(); @@ -599,7 +603,7 @@ public class TestExternalSortInternals extends SubOperatorTest { public void testMergeLimit() { // Constrain merge width int mergeLimitConstraint = 5; - OperatorFixture.Builder builder = new OperatorFixture.Builder(); + OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher); builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint) .build(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java index b09b865..0a6d7f9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java @@ -49,7 +49,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import com.google.common.collect.Lists; -- To stop receiving notification emails like this one, please contact [email protected].
