Github user amansinha100 commented on a diff in the pull request:
https://github.com/apache/drill/pull/822#discussion_r117074811
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int
incomingRowIdx) {
holder.value = vv0.getAccessor().get(incomingRowIdx) ;
}
*/
+ /*
+ if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
+ // for debugging -- show the first row from a spilled batch
+ Object tmp0 =
(incoming).getValueAccessorById(NullableVarCharVector.class,
0).getValueVector();
+ Object tmp1 =
(incoming).getValueAccessorById(NullableVarCharVector.class,
1).getValueVector();
+ Object tmp2 =
(incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+
+ if (tmp0 != null && tmp1 != null && tmp2 != null) {
+ NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+ NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+ NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+ logger.debug("The first row = {} , {} , {}",
vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx),
vv2.getAccessor().get(incomingRowIdx));
+ }
+ }
+ */
+ // The hash code is computed once, then its lower bits are used to
determine the
+ // partition to use, and the higher bits determine the location in the
hash table.
+ int hashCode;
+ try {
+ htables[0].updateBatches();
+ hashCode = htables[0].getHashCode(incomingRowIdx);
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException("Unexpected schema change", e);
+ }
- htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
+ // right shift hash code for secondary (or tertiary...) spilling
+ for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
+ int currentPartition = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+ HashTable.PutStatus putStatus = null;
+ long allocatedBefore = allocator.getAllocatedMemory();
+
+ // Insert the key columns into the hash table
+ try {
+ putStatus = htables[currentPartition].put(incomingRowIdx,
htIdxHolder, hashCode);
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen
when can not spill
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException("Unexpected schema change", e);
+ }
int currentIdx = htIdxHolder.value;
- // get the batch index and index within the batch
- if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
- addBatchHolder();
+ long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
+ if ( addedMem > 0 ) {
+ logger.trace("MEMORY CHECK HT: allocated {} added {} partition
{}",allocatedBefore,addedMem,currentPartition);
}
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) &
HashTable.BATCH_MASK);
+
+ // Check if put() added a new batch (for the keys) inside the hash
table, hence a matching batch
+ // (for the aggregate columns) needs to be created
+ if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+ try {
+ long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+
+ addBatchHolder(currentPartition);
+
+ if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated
a planned batch
+ long totalAddedMem = allocator.getAllocatedMemory() -
allocatedBefore;
+ logger.trace("MEMORY CHECK AGG: added {} total (with HT) added
{}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+ // resize the batch estimate if needed (e.g., varchars may take
more memory than estimated)
+ if ( totalAddedMem > estMaxBatchSize ) {
+ logger.trace("Adjusting Batch size estimate from {} to
{}",estMaxBatchSize,totalAddedMem);
+ estMaxBatchSize = totalAddedMem;
+ }
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may
happen when can not spill
+ }
+ }
+ BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>>
16) & HashTable.BATCH_MASK);
int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+ if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
+ numGroupedRecords++;
+ }
+
+ //
===================================================================================
+ // If the last batch just became full - that is the time to check the
memory limits !!
+ // If exceeded, then need to spill (if 2nd phase) or output early (1st)
+ // (Skip this if cannot spill; in such case an OOM may be encountered
later)
+ //
===================================================================================
+ if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
+
+ plannedBatches++; // planning to allocate one more batch
+
+ // calculate the (max) new memory needed now
+ long hashTableDoublingSizeNeeded = 0; // in case the hash table(s)
would resize
+ for ( HashTable ht : htables ) {
--- End diff --
It is not immediately obvious why all partitions need to be asked for extra
memory needed for resizing. If a new row causes doubling of a hash table,
only 1 partition would be affected. But if you are considering all rows from
an incoming batch then yes it could in the worst case affect all partitions.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---