Repository: drill Updated Branches: refs/heads/master be43a9edd -> c16e5f807
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 7cc43ad..21d5a4a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -24,8 +24,8 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.HashTableConfig; @@ -40,13 +40,17 @@ public interface HashAggregator { new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class); public static enum AggOutcome { - RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR + RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN } + // For returning results from outputCurrentBatch + // OK - batch returned, NONE - end of data, RESTART - call again + public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } + public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, - OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, - LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, - VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException; + OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, + VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException; public abstract IterOutcome getOutcome(); @@ -60,6 +64,9 @@ public interface HashAggregator { public abstract boolean buildComplete(); - public abstract IterOutcome outputCurrentBatch(); + public abstract AggIterOutcome outputCurrentBatch(); + + public abstract boolean earlyOutput(); + public abstract RecordBatch getNewIncoming(); } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java new file mode 100644 index 0000000..b05353e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.aggregate; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +/** + * A class to replace "incoming" - instead scanning a spilled partition file + */ +public class SpilledRecordbatch implements CloseableRecordBatch { + private VectorContainer container; + private InputStream spillStream; + private int spilledBatches; + private FragmentContext context; + private BatchSchema schema; + private OperatorContext oContext; + private SpillSet spillSet; + // Path spillStreamPath; + private String spillFile; + VectorAccessibleSerializable vas; + + public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { + this.context = context; + this.schema = schema; + this.spilledBatches = spilledBatches; + this.oContext = oContext; + this.spillSet = spillSet; + //this.spillStreamPath = spillStreamPath; + this.spillFile = spillFile; + vas = new VectorAccessibleSerializable(oContext.getAllocator()); + container = vas.get(); + + try { + this.spillStream = this.spillSet.openForInput(spillFile); + } catch (IOException e) { throw new RuntimeException(e);} + + next(); // initialize the container + } + + @Override + public SelectionVector2 getSelectionVector2() { + throw new UnsupportedOperationException(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + throw new UnsupportedOperationException(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return container.iterator(); + } + + @Override + public FragmentContext getContext() { return context; } + + @Override + public BatchSchema getSchema() { return schema; } + + @Override + public WritableBatch getWritableBatch() { + return WritableBatch.get(this); + } + + @Override + public VectorContainer getOutgoingContainer() { return container; } + + @Override + public int getRecordCount() { return container.getRecordCount(); } + + @Override + public void kill(boolean sendUpstream) { + this.close(); // delete the current spill file + } + + /** + * Read the next batch from the spill file + * + * @return IterOutcome + */ + @Override + public IterOutcome next() { + + if ( spilledBatches <= 0 ) { // no more batches to read in this partition + this.close(); + return IterOutcome.NONE; + } + + if ( spillStream == null ) { + throw new IllegalStateException("Spill stream was null"); + }; + + if ( spillSet.getPosition(spillStream) < 0 ) { + HashAggTemplate.logger.warn("Position is {} for stream {}", spillSet.getPosition(spillStream), spillStream.toString()); + } + + try { + if ( container.getNumberOfColumns() > 0 ) { // container already initialized + // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable) + // may have a reference to this container (as an "incoming") + vas.readFromStreamWithContainer(container, spillStream); + } + else { // first time - create a container + vas.readFromStream(spillStream); + container = vas.get(); + } + } catch (IOException e) { + throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger); + } + + spilledBatches-- ; // one less batch to read + return IterOutcome.OK; + } + + @Override + public void close() { + container.clear(); + try { + if (spillStream != null) { + spillStream.close(); + spillStream = null; + } + + spillSet.delete(spillFile); + } + catch (IOException e) { + throw new RuntimeException(e); + } finally { + spillSet.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 77ebb0d..436480e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -114,7 +114,7 @@ public class ChainedHashTable { private HashTableConfig htConfig; private final FragmentContext context; private final BufferAllocator allocator; - private final RecordBatch incomingBuild; + private RecordBatch incomingBuild; private final RecordBatch incomingProbe; private final RecordBatch outgoing; @@ -129,14 +129,18 @@ public class ChainedHashTable { this.outgoing = outgoing; } - public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException, + public void updateIncoming(RecordBatch incomingBuild) { + this.incomingBuild = incomingBuild; + } + + public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); top.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // This code is called from generated code, so to step into this code, // persist the code generated in HashAggBatch also. -// top.saveCodeForDebugging(true); + // top.saveCodeForDebugging(true); ClassGenerator<HashTable> cg = top.getRoot(); ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder"); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index ef7dadf..9c93c16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.common; import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -43,7 +44,7 @@ public interface HashTable { */ static final public float DEFAULT_LOAD_FACTOR = 0.75f; - static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;} + static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} /** * The batch size used for internal batch holders @@ -51,30 +52,35 @@ public interface HashTable { static final public int BATCH_SIZE = Character.MAX_VALUE + 1; static final public int BATCH_MASK = 0x0000FFFF; - /** Variable width vector size in bytes */ - public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE; + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, - RecordBatch outgoing, VectorContainer htContainerOrig); + public void updateBatches() throws SchemaChangeException; - public void updateBatches(); + public int getHashCode(int incomingRowIdx) throws SchemaChangeException; - public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount); + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException; - public int containsKey(int incomingRowIdx, boolean isProbe); + public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; public void getStats(HashTableStats stats); + public long extraMemoryNeededForResize(); + public int size(); public boolean isEmpty(); public void clear(); + public void reinit(RecordBatch newIncoming); + + public void reset(); + + public void setMaxVarcharSize(int size); + public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords); - public void addNewKeyBatch(); + // public void addNewKeyBatch(); } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java index c494c85..7baa9d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java @@ -26,6 +26,13 @@ public class HashTableStats { public HashTableStats() { } + + public void addStats (HashTableStats newStats) { + this.numBuckets += newStats.numBuckets ; + this.numEntries += newStats.numEntries ; + this.numResizing += newStats.numResizing ; + this.resizingTime += newStats.resizingTime ; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 96f9422..3209c27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -25,6 +25,7 @@ import javax.inject.Named; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.RuntimeOverridden; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -50,14 +51,19 @@ public abstract class HashTableTemplate implements HashTable { // A hash 'bucket' consists of the start index to indicate start of a hash chain // Array of start indexes. start index is a global index across all batch holders + // This is the "classic hash table", where Hash-Value % size-of-table yields + // the offset/position (in the startIndices) of the beginning of the hash chain. private IntVector startIndices; // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries private ArrayList<BatchHolder> batchHolders; - // Size of the hash table in terms of number of buckets + // Current size of the hash table in terms of number of buckets private int tableSize = 0; + // Original size of the hash table (needed when re-initializing) + private int originalTableSize; + // Threshold after which we rehash; It must be the tableSize * loadFactor private int threshold; @@ -95,6 +101,8 @@ public abstract class HashTableTemplate implements HashTable { private int resizingTime = 0; + private int maxVarcharSize = 8; // for varchar allocation + // This class encapsulates the links, keys and values for up to BATCH_SIZE // *unique* records. Thus, suppose there are N incoming record batches, each // of size BATCH_SIZE..but they have M unique keys altogether, the number of @@ -134,7 +142,9 @@ public abstract class HashTableTemplate implements HashTable { if (vv instanceof FixedWidthVector) { ((FixedWidthVector) vv).allocateNew(BATCH_SIZE); } else if (vv instanceof VariableWidthVector) { - ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE); + long beforeMem = allocator.getAllocatedMemory(); + ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE); + logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize); } else { vv.allocateNew(); } @@ -166,7 +176,7 @@ public abstract class HashTableTemplate implements HashTable { hashValues.getMutator().setValueCount(size); } - protected void setup() { + protected void setup() throws SchemaChangeException { setupInterior(incomingBuild, incomingProbe, outgoing, htContainer); } @@ -175,7 +185,7 @@ public abstract class HashTableTemplate implements HashTable { // currentIdxHolder with the index of the next link. private boolean isKeyMatch(int incomingRowIdx, IndexPointer currentIdxHolder, - boolean isProbe) { + boolean isProbe) throws SchemaChangeException { int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; boolean match = false; @@ -201,7 +211,7 @@ public abstract class HashTableTemplate implements HashTable { // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table // container at the specified index - private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) { + private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException { int currentIdxWithinBatch = currentIdx & BATCH_MASK; setValue(incomingRowIdx, currentIdxWithinBatch); @@ -405,36 +415,34 @@ public abstract class HashTableTemplate implements HashTable { @Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe, @Named("outgoing") RecordBatch outgoing, - @Named("htContainer") VectorContainer htContainer) { + @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException { } @RuntimeOverridden protected boolean isKeyMatchInternalBuild( - @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { return false; } @RuntimeOverridden protected boolean isKeyMatchInternalProbe( - @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { return false; } @RuntimeOverridden - protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { } @RuntimeOverridden - protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { + protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException { } } // class BatchHolder @Override - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, - RecordBatch outgoing, VectorContainer htContainerOrig) { + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); @@ -465,6 +473,7 @@ public abstract class HashTableTemplate implements HashTable { if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; } + originalTableSize = tableSize ; // retain original size threshold = (int) Math.ceil(tableSize * loadf); @@ -476,13 +485,17 @@ public abstract class HashTableTemplate implements HashTable { batchHolders = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received. - doSetup(incomingBuild, incomingProbe); + try { + doSetup(incomingBuild, incomingProbe); + } catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); + } currentIdxHolder = new IndexPointer(); } @Override - public void updateBatches() { + public void updateBatches() throws SchemaChangeException { doSetup(incomingBuild, incomingProbe); for (BatchHolder batchHolder : batchHolders) { batchHolder.setup(); @@ -497,6 +510,21 @@ public abstract class HashTableTemplate implements HashTable { return numResizing; } + /** + * + * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled + */ + @Override + public long extraMemoryNeededForResize() { + if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize + int newSize = roundUpToPowerOf2(2 * tableSize); + + if (newSize > MAXIMUM_CAPACITY) { + newSize = MAXIMUM_CAPACITY; + } + return newSize * 4 /* sizeof(int) */; + } + @Override public int size() { return numEntries; @@ -526,7 +554,7 @@ public abstract class HashTableTemplate implements HashTable { batchHolders = null; } startIndices.clear(); - currentIdxHolder = null; + // currentIdxHolder = null; // keep IndexPointer in case HT is reused numEntries = 0; } @@ -544,86 +572,69 @@ public abstract class HashTableTemplate implements HashTable { return rounded; } - @Override - public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) { - put(incomingRowIdx, htIdxHolder); + public int getHashCode(int incomingRowIdx) throws SchemaChangeException { + return getHashBuild(incomingRowIdx); } - private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) { + /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming + * row into the hash table. The code selects the bucket in the startIndices, then the keys are + * placed into the chained list - by storing the key values into a batch, and updating its + * "links" member. Last it modifies the index holder to the batch offset so that the caller + * can store the remaining parts of the row into a matching batch (outside the hash table). + * Returning + * + * @param incomingRowIdx - position of the incoming row + * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch) + * @param hashCode - computed over the key(s) by calling getHashCode() + * @return Status - the key(s) was ADDED or was already PRESENT + */ + @Override + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException { - int hash = getHashBuild(incomingRowIdx); - int i = getBucketIndex(hash, numBuckets()); - int startIdx = startIndices.getAccessor().get(i); + int bucketIndex = getBucketIndex(hashCode, numBuckets()); + int startIdx = startIndices.getAccessor().get(bucketIndex); int currentIdx; - int currentIdxWithinBatch; - BatchHolder bh; BatchHolder lastEntryBatch = null; int lastEntryIdxWithinBatch = EMPTY_SLOT; + // if startIdx is non-empty, follow the hash chain links until we find a matching + // key or reach the end of the chain (and remember the last link there) + for ( currentIdxHolder.value = startIdx; + currentIdxHolder.value != EMPTY_SLOT; + /* isKeyMatch() below also advances the currentIdxHolder to the next link */) { - if (startIdx == EMPTY_SLOT) { - // this is the first entry in this bucket; find the first available slot in the - // container of keys and values - currentIdx = freeIndex++; - addBatchIfNeeded(currentIdx); + // remember the current link, which would be the last when the next link is empty + lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); + lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; - if (EXTRA_DEBUG) { - logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, - incomingRowIdx, currentIdx); + if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { + htIdxHolder.value = currentIdxHolder.value; + return PutStatus.KEY_PRESENT; } - - insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch); - // update the start index array - startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx); - htIdxHolder.value = currentIdx; - return PutStatus.KEY_ADDED; } - currentIdx = startIdx; - boolean found = false; - - bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); - currentIdxHolder.value = currentIdx; - - // if startIdx is non-empty, follow the hash chain links until we find a matching - // key or reach the end of the chain - while (true) { - currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; + // no match was found, so insert a new entry + currentIdx = freeIndex++; + boolean addedBatch = addBatchIfNeeded(currentIdx); - if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { - htIdxHolder.value = currentIdxHolder.value; - found = true; - break; - } else if (currentIdxHolder.value == EMPTY_SLOT) { - lastEntryBatch = bh; - lastEntryIdxWithinBatch = currentIdxWithinBatch; - break; - } else { - bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); - lastEntryBatch = bh; - } + if (EXTRA_DEBUG) { + logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); } - if (!found) { - // no match was found, so insert a new entry - currentIdx = freeIndex++; - addBatchIfNeeded(currentIdx); + insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch); - if (EXTRA_DEBUG) { - logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); - } - - insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch); - htIdxHolder.value = currentIdx; - return PutStatus.KEY_ADDED; + // if there was no hash chain at this bucket, need to update the start index array + if (startIdx == EMPTY_SLOT) { + startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx); } - - return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED; + htIdxHolder.value = currentIdx; + return addedBatch ? PutStatus.NEW_BATCH_ADDED : + ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ? + PutStatus.KEY_ADDED_LAST : // the last key in the batch + PutStatus.KEY_ADDED; // otherwise } - private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) { - - addBatchIfNeeded(currentIdx); + private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException { BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); @@ -640,60 +651,39 @@ public abstract class HashTableTemplate implements HashTable { // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key @Override - public int containsKey(int incomingRowIdx, boolean isProbe) { + public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException { int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx); - int i = getBucketIndex(hash, numBuckets()); - - int currentIdx = startIndices.getAccessor().get(i); - - if (currentIdx == EMPTY_SLOT) { - return -1; - } - - BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); - currentIdxHolder.value = currentIdx; + int bucketIndex = getBucketIndex(hash, numBuckets()); - boolean found = false; - - while (true) { + for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex); + currentIdxHolder.value != EMPTY_SLOT; ) { + BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK); if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) { - found = true; - break; - } else if (currentIdxHolder.value == EMPTY_SLOT) { - break; - } else { - bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK); + return currentIdxHolder.value; } } - - return found ? currentIdxHolder.value : -1; + return -1; } // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds - // the capacity, we will add a new BatchHolder. - private BatchHolder addBatchIfNeeded(int currentIdx) { + // the capacity, we will add a new BatchHolder. Return true if a new batch was added. + private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException { int totalBatchSize = batchHolders.size() * BATCH_SIZE; if (currentIdx >= totalBatchSize) { - BatchHolder bh = addBatchHolder(); + BatchHolder bh = newBatchHolder(batchHolders.size()); + batchHolders.add(bh); + bh.setup(); if (EXTRA_DEBUG) { logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); } - return bh; - } else { - return batchHolders.get(batchHolders.size() - 1); + return true; } + return false; } - private BatchHolder addBatchHolder() { - BatchHolder bh = newBatchHolder(batchHolders.size()); - batchHolders.add(bh); - bh.setup(); - return bh; - } - - protected BatchHolder newBatchHolder(int index) { + protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code return new BatchHolder(index); } @@ -755,6 +745,34 @@ public abstract class HashTableTemplate implements HashTable { numResizing++; } + /** + * Reinit the hash table to its original size, and clear up all its prior batch holder + * + */ + public void reset() { + // long before = allocator.getAllocatedMemory(); + this.clear(); // Clear all current batch holders and hash table (i.e. free their memory) + // long after = allocator.getAllocatedMemory(); + + // logger.debug("Reinit Hash Table: Memory before {} After {} Percent after: {}",before,after, (100 * after ) / before); + + freeIndex = 0; // all batch holders are gone + // reallocate batch holders, and the hash table to the original size + batchHolders = new ArrayList<BatchHolder>(); + startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT); + } + public void reinit(RecordBatch newIncoming) { + incomingBuild = newIncoming; + reset(); + try { + updateBatches(); // Needed ? (to update the new incoming?) + } catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); + } catch(IndexOutOfBoundsException ioob) { + throw new IllegalStateException("reinit update batches", ioob); + } + } + @Override public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) { assert batchIdx < batchHolders.size(); @@ -775,17 +793,20 @@ public abstract class HashTableTemplate implements HashTable { } @Override + public void setMaxVarcharSize(int size) { maxVarcharSize = size; } + +/* @Override public void addNewKeyBatch() { int numberOfBatches = batchHolders.size(); this.addBatchHolder(); freeIndex = numberOfBatches * BATCH_SIZE; } - +*/ // These methods will be code-generated in the context of the outer class - protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe); + protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException; - protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx); + protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException; - protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx); + protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index e2c016b..4af1664 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -315,7 +315,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Create the chained hash table final ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); - hashTable = ht.createAndSetupHashTable(null); + hashTable = ht.createAndSetupHashTable(null, 1); } public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { @@ -374,7 +374,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // For every record in the build batch , hash the key columns for (int i = 0; i < currentRecordCount; i++) { - hashTable.put(i, htIndex, 1 /* retry count */); + int hashCode = hashTable.getHashCode(i); + hashTable.put(i, htIndex, hashCode); /* Use the global index returned by the hash table, to store * the current record index and batch index. This will be used http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java index 4cb2bae..a1b8169 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.VariableWidthVector; /** * Given a record batch or vector container, determines the actual memory @@ -68,14 +69,14 @@ public class RecordBatchSizer { public int capacity; public int density; public int dataSize; + public boolean variableWidth; - public ColumnSize(ValueVector v) { - metadata = v.getField(); + public ColumnSize(ValueVector vv) { + metadata = vv.getField(); stdSize = TypeHelper.getSize(metadata.getType()); // Can't get size estimates if this is an empty batch. - - int rowCount = v.getAccessor().getValueCount(); + int rowCount = vv.getAccessor().getValueCount(); if (rowCount == 0) { estSize = stdSize; return; @@ -84,17 +85,17 @@ public class RecordBatchSizer { // Total size taken by all vectors (and underlying buffers) // associated with this vector. - totalSize = v.getAllocatedByteCount(); + totalSize = vv.getAllocatedByteCount(); // Capacity is the number of values that the vector could // contain. This is useful only for fixed-length vectors. - capacity = v.getValueCapacity(); + capacity = vv.getValueCapacity(); // The amount of memory consumed by the payload: the actual // data stored in the vectors. - dataSize = v.getPayloadByteCount(); + dataSize = vv.getPayloadByteCount(); // Determine "density" the number of rows compared to potential // capacity. Low-density batches occur at block boundaries, ends @@ -105,6 +106,7 @@ public class RecordBatchSizer { density = roundUp(dataSize * 100, totalSize); estSize = roundUp(dataSize, rowCount); + variableWidth = vv instanceof VariableWidthVector ; } @Override @@ -155,6 +157,7 @@ public class RecordBatchSizer { * vectors are partially full; prevents overestimating row width. */ private int netRowWidth; + private int netRowWidthCap50; private boolean hasSv2; private int sv2Size; private int avgDensity; @@ -167,6 +170,18 @@ public class RecordBatchSizer { batch.getSelectionVector2() : null); } + /** + * Maximum width of a column; used for memory estimation in case of Varchars + */ + public int maxSize; + /** + * Count the nullable columns; used for memory estimation + */ + public int numNullables; + /** + * + * @param va + */ public RecordBatchSizer(VectorAccessible va) { this(va, null); } @@ -174,7 +189,9 @@ public class RecordBatchSizer { public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) { rowCount = va.getRecordCount(); for (VectorWrapper<?> vw : va) { - measureColumn(vw); + int size = measureColumn(vw.getValueVector()); + if ( size > maxSize ) { maxSize = size; } + if ( vw.getField().isNullable() ) { numNullables++; } } if (rowCount > 0) { @@ -208,32 +225,45 @@ public class RecordBatchSizer { totalBatchSize += sv2Size; } - private void measureColumn(VectorWrapper<?> vw) { - measureColumn(vw.getValueVector()); + /** + * Round up (if needed) to the next power of 2 (only up to 64) + * @param arg Number to round up (must be < 64) + * @return power of 2 result + */ + private int roundUpToPowerOf2(int arg) { + if ( arg <= 2 ) { return 2; } + if ( arg <= 4 ) { return 4; } + if ( arg <= 8 ) { return 8; } + if ( arg <= 16 ) { return 16; } + if ( arg <= 32 ) { return 32; } + return 64; } - - private void measureColumn(ValueVector v) { - + private int measureColumn(ValueVector vv) { // Maps consume no size themselves. However, their contained // vectors do consume space, so visit columns recursively. - - if (v.getField().getType().getMinorType() == MinorType.MAP) { - expandMap((AbstractMapVector) v); - return; + if (vv.getField().getType().getMinorType() == MinorType.MAP) { + return expandMap((AbstractMapVector) vv); } - ColumnSize colSize = new ColumnSize(v); + + ColumnSize colSize = new ColumnSize(vv); columnSizes.add(colSize); stdRowWidth += colSize.stdSize; totalBatchSize += colSize.totalSize; netBatchSize += colSize.dataSize; netRowWidth += colSize.estSize; + netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize : + 8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) ); + // above change 8 to 4 after DRILL-5446 is fixed + return colSize.estSize; } - private void expandMap(AbstractMapVector mapVector) { + private int expandMap(AbstractMapVector mapVector) { + int accum = 0; for (ValueVector vector : mapVector) { - measureColumn(vector); + accum += measureColumn(vector); } + return accum; } public static int roundUp(int num, int denom) { @@ -247,10 +277,18 @@ public class RecordBatchSizer { public int stdRowWidth() { return stdRowWidth; } public int grossRowWidth() { return grossRowWidth; } public int netRowWidth() { return netRowWidth; } + /** + * Compute the "real" width of the row, taking into account each varchar column size + * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation) + * and null marking columns. + * @return "real" width of the row + */ + public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; } public int actualSize() { return totalBatchSize; } public boolean hasSv2() { return hasSv2; } public int avgDensity() { return avgDensity; } public int netSize() { return netBatchSize; } + public int maxSize() { return maxSize; } public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java index 74e1fb5..87eebc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java @@ -30,11 +30,13 @@ import java.util.List; import java.util.Set; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -84,7 +86,7 @@ public class SpillSet { * Given a manager-specific input stream, return the current read position. * Used to report total read bytes. * - * @param outputStream input stream created by the file manager + * @param inputStream input stream created by the file manager * @return */ long getReadBytes(InputStream inputStream); @@ -346,7 +348,6 @@ public class SpillSet { */ private final String spillDirName; - private final String spillFileName; private int fileCount = 0; @@ -356,16 +357,34 @@ public class SpillSet { private long writeBytes; - public SpillSet(FragmentContext context, PhysicalOperator popConfig) { - this(context, popConfig, null, "spill"); - } - - public SpillSet(FragmentContext context, PhysicalOperator popConfig, - String opName, String fileName) { + public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) { FragmentHandle handle = context.getHandle(); + String operName = "Unknown"; + + // Set the spill options from the configuration DrillConfig config = context.getConfig(); - spillFileName = fileName; - List<String> dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); + String spillFs; + List<String> dirList; + + // Set the operator name (used as part of the spill file name), + // and set oper. specific options (the config file defaults to using the + // common options; users may override those - per operator) + switch (optype) { + case EXTERNAL_SORT: + operName = "Sort"; + spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); + break; + case HASH_AGGREGATE: + operName = "HashAgg"; + spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS); + break; + default: // just use the common ones + spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.SPILL_DIRS); + } + dirs = Iterators.cycle(dirList); // If more than one directory, semi-randomly choose an offset into @@ -386,23 +405,18 @@ public class SpillSet { // system is selected and impersonation is off. (We use that // as a proxy for a non-production Drill setup.) - String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM); boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED); if (spillFs.startsWith("file:///") && ! impersonationEnabled) { fileManager = new LocalFileManager(spillFs); } else { fileManager = new HadoopFileManager(spillFs); } - spillDirName = String.format( - "%s_major%d_minor%d_op%d%s", - QueryIdHelper.getQueryId(handle.getQueryId()), - handle.getMajorFragmentId(), - handle.getMinorFragmentId(), - popConfig.getOperatorId(), - (opName == null) ? "" : "_" + opName); + + spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()), + operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId()); } - public String getNextSpillFile() { + public String getNextSpillFile(String extraName) { // Identify the next directory from the round-robin list to // the file created from this round of spilling. The directory must @@ -411,7 +425,12 @@ public class SpillSet { String spillDir = dirs.next(); String currSpillPath = Joiner.on("/").join(spillDir, spillDirName); currSpillDirs.add(currSpillPath); - String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount); + + String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount); + if ( extraName != null ) { + outputFile += "_" + extraName; + } + try { fileManager.deleteOnExit(currSpillPath); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 69e9b4c..4d5f290 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun; + +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -399,7 +401,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { allocator = oContext.getAllocator(); opCodeGen = new OperatorCodeGenerator(context, popConfig); - spillSet = new SpillSet(context, popConfig, "sort", "run"); + spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT); copierHolder = new CopierHolder(context, allocator, opCodeGen); configure(context.getConfig()); } @@ -1390,7 +1392,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // spill file. After each write, we release the memory associated // with the just-written batch. - String outputFile = spillSet.getNextSpillFile(); + String outputFile = spillSet.getNextSpillFile(null); stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount()); BatchGroup.SpilledRun newGroup = null; try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index 732ff15..8c69930 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -51,7 +51,7 @@ import java.util.List; public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel { - protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}; + public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}; protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase protected List<NamedExpression> keys = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java index b911f6b..460ee8a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java @@ -61,6 +61,9 @@ public abstract class AggPruleBase extends Prule { // currently won't generate a 2 phase plan. protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) { PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); + if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr + return true; + } RelNode child = call.rel(0).getInputs().get(0); boolean smallInput = child.getRows() < settings.getSliceTarget(); if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) { http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java index c382af6..09d33fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java @@ -73,7 +73,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { Prel child = (Prel) this.getInput(); - HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys, aggExprs, 1.0f); + HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), operPhase, keys, aggExprs, 1.0f); return creator.addMetadata(this, g); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 648adb7..15314ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -133,6 +133,9 @@ public class PlannerSettings implements Context{ the need to turn off join optimization may go away. */ public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true); + // for testing purpose + public static final String FORCE_2PHASE_AGGR_KEY = "planner.force_2phase_aggr"; + public static final BooleanValidator FORCE_2PHASE_AGGR = new BooleanValidator(FORCE_2PHASE_AGGR_KEY, false); public OptionManager options = null; public FunctionImplementationRegistry functionImplementationRegistry = null; @@ -274,6 +277,8 @@ public class PlannerSettings implements Context{ return options.getOption(TYPE_INFERENCE); } + public boolean isForce2phaseAggr() { return options.getOption(FORCE_2PHASE_AGGR);} // for testing + public long getInSubqueryThreshold() { return options.getOption(IN_SUBQUERY_THRESHOLD); } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 04cf8fc..0daa6b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; * <p> * A key thing to know is that the Iterator provided by a record batch must * align with the rank positions of the field IDs provided using - * {@link getValueVectorId}. + * {@link #getValueVectorId}. * </p> */ public interface RecordBatch extends VectorAccessible { http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 8492f36..c2a4d65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -93,6 +93,10 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, PlannerSettings.QUOTING_IDENTIFIERS, PlannerSettings.JOIN_OPTIMIZATION, + PlannerSettings.FORCE_2PHASE_AGGR, // for testing + ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR, + ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR, + ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java index d06424e..79b49e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.server.options.OptionManager; public class MemoryAllocationUtilities { @@ -40,7 +39,7 @@ public class MemoryAllocationUtilities { * @param plan * @param queryContext */ - public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) { + public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) { // Test plans may already have a pre-defined memory plan. // Otherwise, determine memory allocation. @@ -49,30 +48,30 @@ public class MemoryAllocationUtilities { return; } // look for external sorts - final List<ExternalSort> sortList = new LinkedList<>(); + final List<PhysicalOperator> bufferedOpList = new LinkedList<>(); for (final PhysicalOperator op : plan.getSortedOperators()) { - if (op instanceof ExternalSort) { - sortList.add((ExternalSort) op); + if ( op.isBufferedOperator() ) { + bufferedOpList.add(op); } } // if there are any sorts, compute the maximum allocation, and set it on them - if (sortList.size() > 0) { + if (bufferedOpList.size() > 0) { final OptionManager optionManager = queryContext.getOptions(); final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC)); maxAllocPerNode = Math.min(maxAllocPerNode, optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); - final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode); - logger.debug("Max sort alloc: {}", maxSortAlloc); + final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode); + logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc); - for(final ExternalSort externalSort : sortList) { + for(final PhysicalOperator op : bufferedOpList) { // Ensure that the sort receives the minimum memory needed to make progress. // Without this, the math might work out to allocate too little memory. - long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation()); - externalSort.setMaxAllocation(alloc); + long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation()); + op.setMaxAllocation(alloc); } } plan.getProperties().hasResourcePlan = true; http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 5e5fef0..62c2307 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -433,7 +433,7 @@ public class Foreman implements Runnable { private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { validatePlan(plan); - MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); //Marking endTime of Planning queryManager.markPlanningEndTime(); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index 7ffb224..2f945d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -97,7 +97,7 @@ public class PlanSplitter { throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType"); } - MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c2a2bf0..8aedaf6 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -207,6 +207,33 @@ drill.exec: { // java ... -ea -Ddrill.exec.debug.validate_vectors=true ... validate_vectors: false }, + spill: { + // *** Options common to all the operators that may spill + // File system to use. Local file system by default. + fs: "file:///", + // List of directories to use. Directories are created + // if they do not exist. + directories: [ "/tmp/drill/spill" ] + }, + hashagg: { + // An internal tuning; should not be changed + min_batches_per_partition: 3, + // An option for testing - force a memory limit + mem_limit: 0, + // The max number of partitions in each hashagg operator + // This number is tuned down when memory is limited + // Setting it to 1 means: No spilling + num_partitions: 32, + spill: { + // -- The 2 options below can be used to override the common ones + // -- (common to all spilling operators) + // File system to use. Local file system by default. + fs: ${drill.exec.spill.fs}, + // List of directories to use. Directories are created + // if they do not exist. + directories: ${drill.exec.spill.directories}, + } + }, sort: { purge.threshold : 1000, external: { @@ -232,11 +259,15 @@ drill.exec: { group.size: 40000, // Deprecated for managed xsort; used only by legacy xsort threshold: 40000, + // -- The two options below can be used to override the options common + // -- for all spilling operators (see "spill" above). + // -- This is done for backward compatibility; in the future they + // -- would be deprecated (you should be using only the common ones) // File system to use. Local file system by default. - fs: "file:///" + fs: ${drill.exec.spill.fs}, // List of directories to use. Directories are created // if they do not exist. - directories: [ "/tmp/drill/spill" ], + directories: ${drill.exec.spill.directories}, // Size of the batches written to, and read from, the spill files. // Determines the ratio of memory to input data size for a single- // generation sort. Smaller values give larger ratios, but at a http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java index 27df710..1a4d63b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java @@ -211,12 +211,13 @@ public class TestBugFixes extends BaseTestQuery { int limit = 65536; ImmutableList.Builder<Map<String, Object>> baselineBuilder = ImmutableList.builder(); for (int i = 0; i < limit; i++) { - baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", String.valueOf(i + 1))); + baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", /*String.valueOf */ (i + 1))); } List<Map<String, Object>> baseline = baselineBuilder.build(); testBuilder() - .sqlQuery(String.format("select id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id limit %s", TEST_RES_PATH, limit)) + .sqlQuery(String.format("select cast(id as int) as id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id order by 1 limit %s", + TEST_RES_PATH, limit)) .unOrdered() .baselineRecords(baseline) .go(); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java index 66b7571..f15e757 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull; * any particular order of execution. We ignore the results. */ public class TestTpchDistributedConcurrent extends BaseTestQuery { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual. + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // Longer timeout than usual. /* * Valid test names taken from TestTpchDistributed. Fuller path prefixes are http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java new file mode 100644 index 0000000..fe6fcbc --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.agg; + +import ch.qos.logback.classic.Level; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.test.ClientFixture; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.LogFixture; +import org.apache.drill.test.ProfileParser; +import org.apache.drill.test.QueryBuilder; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test spilling for the Hash Aggr operator (using the mock reader) + */ +public class TestHashAggrSpill extends BaseTestQuery { + + private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception { + String plan = client.queryBuilder().sql(sql).explainJson(); + + QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run(); + if ( expectedRows > 0 ) { + assertEquals(expectedRows, summary.recordCount()); + } + // System.out.println(String.format("======== \n Results: %,d records, %d batches, %,d ms\n ========", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) ); + + //System.out.println("Query ID: " + summary.queryIdString()); + ProfileParser profile = client.parseProfile(summary.queryIdString()); + //profile.print(); + List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE); + + assertTrue( ! ops.isEmpty() ); + // check for the first op only + ProfileParser.OperatorProfile hag0 = ops.get(0); + long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal()); + assertEquals(spillCycle, opCycle); + long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal()); + assertEquals(spilledPartitions, op_spilled_partitions); + /* assertEquals(3, ops.size()); + for ( int i = 0; i < ops.size(); i++ ) { + ProfileParser.OperatorProfile hag = ops.get(i); + long cycle = hag.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal()); + long num_partitions = hag.getMetric(HashAggTemplate.Metric.NUM_PARTITIONS.ordinal()); + long spilled_partitions = hag.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal()); + long mb_spilled = hag.getMetric(HashAggTemplate.Metric.SPILL_MB.ordinal()); + System.out.println(String.format("(%d) Spill cycle: %d, num partitions: %d, spilled partitions: %d, MB spilled: %d", i,cycle, num_partitions, spilled_partitions, + mb_spilled)); + } */ + } + + /** + * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling + * ("normal spill" means spill-cycle = 1 ) + * + * @throws Exception + */ + @Test + public void testHashAggrSpill() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true) + .maxParallelization(2) + .saveProfiles() + //.keepLocalFiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i"; + runAndDump(client, sql, 1_200_000, 1, 1); + } + } + + /** + * Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back + * (Hence spill-cycle = 2 ) + * + * @throws Exception + */ + @Test + public void testHashAggrSecondaryTertiarySpill() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + .logger("org.apache.drill.exec.cache", Level.INFO) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false) + // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true) + .maxParallelization(1) + .saveProfiles() + //.keepLocalFiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i"; + runAndDump(client, sql, 1_100_000, 3, 2); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java index e39a644..66588b1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.TopN; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.junit.Ignore; import org.junit.Test; @@ -125,7 +126,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { @Test public void testSimpleHashAgg() { - HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); + HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); List<String> inputJsonBatches = Lists.newArrayList( "[{\"a\": 5, \"b\" : 1 }]", "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]"); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml index cb0c517..eecbdfa 100644 --- a/exec/jdbc/pom.xml +++ b/exec/jdbc/pom.xml @@ -119,6 +119,7 @@ <exclude>**/.checkstyle</exclude> <exclude>**/.buildpath</exclude> <exclude>**/*.json</exclude> + <exclude>**/*.iml</exclude> <exclude>**/git.properties</exclude> <exclude>**/donuts-output-data.txt</exclude> <exclude>**/*.tbl</exclude> http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index aa713f8..deed7a7 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -192,8 +192,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp r.pBody, r.dBodies); if (RpcConstants.EXTRA_DEBUGGING) { logger.debug("Adding message to outbound buffer. {}", outMessage); + logger.debug("Sending response with Sender {}", System.identityHashCode(this)); } - logger.debug("Sending response with Sender {}", System.identityHashCode(this)); connection.getChannel().writeAndFlush(outMessage); } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/vector/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index 105ea47..581a9f8 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -377,6 +377,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } + logger.trace("Reallocating VarChar, new size {}",newAllocationSize); final DrillBuf newBuf = allocator.buffer((int)newAllocationSize); newBuf.setBytes(0, data, 0, data.capacity()); data.release();
