Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/822#discussion_r118812287
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig,
HashTableConfig htConfig, Fragme
}
}
- ChainedHashTable ht =
+ spillSet = new SpillSet(context,hashAggrConfig,
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+ baseHashTable =
new ChainedHashTable(htConfig, context, allocator, incoming, null
/* no incoming probe */, outgoing);
- this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+ this.groupByOutFieldIds = groupByOutFieldIds; // retain these for
delayedSetup, and to allow recreating hash tables (after a spill)
numGroupByOutFields = groupByOutFieldIds.length;
- batchHolders = new ArrayList<BatchHolder>();
- // First BatchHolder is created when the first put request is received.
doSetup(incoming);
}
+ /**
+ * Delayed setup are the parts from setup() that can only be set after
actual data arrives in incoming
+ * This data is used to compute the number of partitions.
+ */
+ private void delayedSetup() {
+
+ // Set the number of partitions from the configuration (raise to a
power of two, if needed)
+ numPartitions =
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+ if ( numPartitions == 1 ) {
+ canSpill = false;
+ logger.warn("Spilling was disabled");
+ }
+ while (Integer.bitCount(numPartitions) > 1) { // in case not a power
of 2
+ numPartitions++;
+ }
+ if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an
empty batch
+ else {
+ // Estimate the max batch size; should use actual data (e.g. lengths
of varchars)
+ updateEstMaxBatchSize(incoming);
+ }
+ long memAvail = memoryLimit - allocator.getAllocatedMemory();
+ if ( !canSpill ) { // single phase, or spill disabled by configuation
+ numPartitions = 1; // single phase should use only a single
partition (to save memory)
+ } else { // two phase
+ // Adjust down the number of partitions if needed - when the memory
available can not hold as
+ // many batches (configurable option), plus overhead (e.g. hash
table, links, hash values))
+ while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition +
8 * 1024 * 1024) > memAvail ) {
+ numPartitions /= 2;
+ if ( numPartitions < 2) {
+ if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at
least 2 to make progress
--- End diff --
Maybe log the problem to help track down issues on a production system?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---