[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051133#comment-16051133
 ] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r122322345
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
         }
       }
     
    +  /**
    +   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
    +   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
    +   *  For 1st phase, may return when an early output needs to be performed.
    +   *
    +   * @return Agg outcome status
    +   */
       @Override
       public AggOutcome doWork() {
    -    try {
    -      // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
    -      // StreamingAggregate which does somethings conditionally in the 
outer try block.
    -      // In the future HashAggregate may also need to perform some actions 
conditionally
    -      // in the outer try block.
    -
    -      assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
     
    -      outside:
    -      while (true) {
    +    while (true) {
     
    -        // This would be called only once - after actual data arrives on 
incoming
    -        if ( schema == null && incoming.getRecordCount() > 0 ) {
    -          this.schema = incoming.getSchema();
    -          // Calculate the number of partitions based on actual incoming 
data
    -          delayedSetup();
    -        }
    +      // This would be called only once - first time actual data arrives 
on incoming
    +      if ( schema == null && incoming.getRecordCount() > 0 ) {
    +        this.schema = incoming.getSchema();
    +        currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
    +        // Calculate the number of partitions based on actual incoming data
    +        delayedSetup();
    +      }
     
    -        // loop through existing records, aggregating the values as 
necessary.
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Starting outer loop of doWork()...");
    +      //
    +      //  loop through existing records in this batch, aggregating the 
values as necessary.
    +      //
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Starting outer loop of doWork()...");
    +      }
    +      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +        if (EXTRA_DEBUG_2) {
    +          logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
             }
    -        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
    -          if (EXTRA_DEBUG_2) {
    -            logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
    -          }
    -          checkGroupAndAggrValues(currentIndex);
    -          // If adding a group discovered a memory pressure during 1st 
phase, then start
    -          // outputing some partition to free memory.
    -          if ( earlyOutput ) {
    -            outputCurrentBatch();
    -            incIndex(); // next time continue with the next incoming row
    -            return AggOutcome.RETURN_OUTCOME;
    -          }
    +        checkGroupAndAggrValues(currentIndex);
    +        // If adding a group discovered a memory pressure during 1st 
phase, then start
    +        // outputing some partition downstream in order to free memory.
    +        if ( earlyOutput ) {
    +          outputCurrentBatch();
    +          incIndex(); // next time continue with the next incoming row
    +          return AggOutcome.RETURN_OUTCOME;
             }
    +      }
    +
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Processed {} records", underlyingIndex);
    +      }
     
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Processed {} records", underlyingIndex);
    +      // Cleanup the previous batch since we are done processing it.
    +      for (VectorWrapper<?> v : incoming) {
    --- End diff --
    
    Mmmm... this is the original Hash Agg code. Hence probably the two 
invariants are met.


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to