Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/822#discussion_r119258161
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -204,24 +293,157 @@ private int getNumPendingOutput() {
@RuntimeOverridden
public void setupInterior(@Named("incoming") RecordBatch incoming,
@Named("outgoing") RecordBatch outgoing,
- @Named("aggrValuesContainer") VectorContainer aggrValuesContainer)
{
+ @Named("aggrValuesContainer") VectorContainer aggrValuesContainer)
throws SchemaChangeException {
}
@RuntimeOverridden
- public void updateAggrValuesInternal(@Named("incomingRowIdx") int
incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ public void updateAggrValuesInternal(@Named("incomingRowIdx") int
incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
}
@RuntimeOverridden
- public void outputRecordValues(@Named("htRowIdx") int htRowIdx,
@Named("outRowIdx") int outRowIdx) {
+ public void outputRecordValues(@Named("htRowIdx") int htRowIdx,
@Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
}
}
+ /**
+ * An internal class to replace "incoming" - instead scanning a spilled
partition file
+ */
+ public class SpilledRecordbatch implements CloseableRecordBatch {
+ private VectorContainer container = null;
+ private InputStream spillStream;
+ private int spilledBatches;
+ private FragmentContext context;
+ private BatchSchema schema;
+ private OperatorContext oContext;
+ // Path spillStreamPath;
+ private String spillFile;
+ VectorAccessibleSerializable vas;
+
+ public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/
int spilledBatches, FragmentContext context, BatchSchema schema,
OperatorContext oContext) {
+ this.context = context;
+ this.schema = schema;
+ this.spilledBatches = spilledBatches;
+ this.oContext = oContext;
+ //this.spillStreamPath = spillStreamPath;
+ this.spillFile = spillFile;
+ vas = new VectorAccessibleSerializable(allocator);
+ container = vas.get();
+
+ try {
+ this.spillStream = spillSet.openForInput(spillFile);
+ } catch (IOException e) { throw new RuntimeException(e);}
+
+ next(); // initialize the container
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int...
ids) {
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public FragmentContext getContext() { return context; }
+
+ @Override
+ public BatchSchema getSchema() { return schema; }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(this);
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() { return container; }
+
+ @Override
+ public int getRecordCount() { return container.getRecordCount(); }
+
+ @Override
+ public void kill(boolean sendUpstream) {
+ this.close(); // delete the current spill file
+ }
+
+ /**
+ * Read the next batch from the spill file
+ *
+ * @return IterOutcome
+ */
+ @Override
+ public IterOutcome next() {
--- End diff --
HashAgg is unique in the way it reads (and processes) the spilled batches
exactly like reading (and processing) the incoming batches. Its actual code
footprint is quite small (mostly in the next() method).
SpilledRun extends BatchGroup, and seems to have more logic. The two seem
to have too many differences to bother in combining them....
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---