Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118812097
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -204,24 +293,157 @@ private int getNumPendingOutput() {
     
         @RuntimeOverridden
         public void setupInterior(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") RecordBatch outgoing,
    -        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) 
{
    +        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) 
throws SchemaChangeException {
         }
     
         @RuntimeOverridden
    -    public void updateAggrValuesInternal(@Named("incomingRowIdx") int 
incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
    +    public void updateAggrValuesInternal(@Named("incomingRowIdx") int 
incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
         }
     
         @RuntimeOverridden
    -    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, 
@Named("outRowIdx") int outRowIdx) {
    +    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, 
@Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
         }
       }
     
    +  /**
    +   * An internal class to replace "incoming" - instead scanning a spilled 
partition file
    +   */
    +  public class SpilledRecordbatch implements CloseableRecordBatch {
    +    private VectorContainer container = null;
    +    private InputStream spillStream;
    +    private int spilledBatches;
    +    private FragmentContext context;
    +    private BatchSchema schema;
    +    private OperatorContext oContext;
    +    // Path spillStreamPath;
    +    private String spillFile;
    +    VectorAccessibleSerializable vas;
    +
    +    public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ 
int spilledBatches, FragmentContext context, BatchSchema schema, 
OperatorContext oContext) {
    +      this.context = context;
    +      this.schema = schema;
    +      this.spilledBatches = spilledBatches;
    +      this.oContext = oContext;
    +      //this.spillStreamPath = spillStreamPath;
    +      this.spillFile = spillFile;
    +      vas = new VectorAccessibleSerializable(allocator);
    +      container = vas.get();
    +
    +      try {
    +        this.spillStream = spillSet.openForInput(spillFile);
    +      } catch (IOException e) { throw new RuntimeException(e);}
    +
    +      next(); // initialize the container
    +    }
    +
    +    @Override
    +    public SelectionVector2 getSelectionVector2() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public SelectionVector4 getSelectionVector4() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public TypedFieldId getValueVectorId(SchemaPath path) {
    +      return container.getValueVectorId(path);
    +    }
    +
    +    @Override
    +    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
ids) {
    +      return container.getValueAccessorById(clazz, ids);
    +    }
    +
    +    @Override
    +    public Iterator<VectorWrapper<?>> iterator() {
    +      return container.iterator();
    +    }
    +
    +    @Override
    +    public FragmentContext getContext() { return context; }
    +
    +    @Override
    +    public BatchSchema getSchema() { return schema; }
    +
    +    @Override
    +    public WritableBatch getWritableBatch() {
    +      return WritableBatch.get(this);
    +    }
    +
    +    @Override
    +    public VectorContainer getOutgoingContainer() { return container; }
    +
    +    @Override
    +    public int getRecordCount() { return container.getRecordCount(); }
    +
    +    @Override
    +    public void kill(boolean sendUpstream) {
    +      this.close(); // delete the current spill file
    +    }
    +
    +    /**
    +     * Read the next batch from the spill file
    +     *
    +     * @return IterOutcome
    +     */
    +    @Override
    +    public IterOutcome next() {
    --- End diff --
    
    Ah! It is clear what is happening now. This RecordBatch is a kind of 
operator: one that reads from disk into the attached container. Quite clever, 
actually. But probably overly complex and hard to maintain.
    
    Maybe what you want is to reuse the `SpilledRun` class from the external 
sort. Maybe we can generalize that class a bit. It already has the logic to 
associate a spilled run with a file, iterate over rows and so on. (Though, the 
iterator behavior should be cleaned up...) `SpilledRun` does that without the 
full baggage of the `RecordBatch` protocol.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to