[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032210#comment-16032210
 ] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r119500755
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
           }
         }
     
    -    ChainedHashTable ht =
    +    spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
    +    baseHashTable =
             new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
    -    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
    -
    +    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
         numGroupByOutFields = groupByOutFieldIds.length;
    -    batchHolders = new ArrayList<BatchHolder>();
    -    // First BatchHolder is created when the first put request is received.
     
         doSetup(incoming);
       }
     
    +  /**
    +   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
    +   *  This data is used to compute the number of partitions.
    +   */
    +  private void delayedSetup() {
    +
    +    // Set the number of partitions from the configuration (raise to a 
power of two, if needed)
    +    numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
    +    if ( numPartitions == 1 ) {
    +      canSpill = false;
    +      logger.warn("Spilling was disabled");
    +    }
    +    while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
    +      numPartitions++;
    +    }
    +    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
    +    else {
    +      // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
    +      updateEstMaxBatchSize(incoming);
    +    }
    +    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    if ( !canSpill ) { // single phase, or spill disabled by configuation
    +      numPartitions = 1; // single phase should use only a single 
partition (to save memory)
    +    } else { // two phase
    +      // Adjust down the number of partitions if needed - when the memory 
available can not hold as
    +      // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
    +      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
    +        numPartitions /= 2;
    +        if ( numPartitions < 2) {
    +          if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
    +          break;
    +        }
    +      }
    +    }
    +    logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
    +        numPartitions, canSpill ? "Can" : "Cannot");
    +
    +    // The following initial safety check should be revisited once we can 
lower the number of rows in a batch
    +    // In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
    +    if ( numPartitions == 1 ) {
    +      // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
    +      allocator.setLimit(10_000_000_000L);
    +    }
    +    // Based on the number of partitions: Set the mask and bit count
    +    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
    +    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
    +
    +    // Create arrays (one entry per partition)
    +    htables = new HashTable[numPartitions] ;
    +    batchHolders = (ArrayList<BatchHolder>[]) new 
ArrayList<?>[numPartitions] ;
    +    outBatchIndex = new int[numPartitions] ;
    +    outputStream = new OutputStream[numPartitions];
    +    spilledBatchesCount = new int[numPartitions];
    +    // spilledPaths = new Path[numPartitions];
    +    spillFiles = new String[numPartitions];
    +    spilledPartitionsList = new ArrayList<SpilledPartition>();
    +
    +    plannedBatches = numPartitions; // each partition should allocate its 
first batch
    +
    +    // initialize every (per partition) entry in the arrays
    +    for (int i = 0; i < numPartitions; i++ ) {
    +      try {
    +        this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
    +        this.htables[i].setMaxVarcharSize(maxColumnWidth);
    +      } catch (IllegalStateException ise) {} // ignore
    +      catch (Exception e) { throw new DrillRuntimeException(e); }
    +      this.batchHolders[i] = new ArrayList<BatchHolder>(); // First 
BatchHolder is created when the first put request is received.
    +    }
    +  }
    +  /**
    +   * get new incoming: (when reading spilled files like an "incoming")
    +   * @return The (newly replaced) incoming
    +   */
    +  @Override
    +  public RecordBatch getNewIncoming() { return incoming; }
    +
    +  private void initializeSetup(RecordBatch newIncoming) throws 
SchemaChangeException, ClassTransformationException, IOException {
    --- End diff --
    
    All "throw ClassTransformationException" clauses were removed (how come 
Intellij did not mark them as redundant ?)



> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to