fix issue with end of batch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/add8c724 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/add8c724 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/add8c724 Branch: refs/heads/master Commit: add8c724b9490feba442ae831d9b9c0d6babe413 Parents: 93121cb Author: Ben Becker <[email protected]> Authored: Sun Aug 11 15:12:32 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 15 18:31:31 2013 -0700 ---------------------------------------------------------------------- .../partitionsender/OutgoingRecordBatch.java | 71 +++++++++++++------- .../PartitionSenderRootExec.java | 13 ++-- 2 files changed, 52 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 6eff778..927cc75 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -19,7 +19,9 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.util.Iterator; +import java.util.List; +import com.beust.jcommander.internal.Lists; import com.google.common.base.Preconditions; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; @@ -76,34 +78,58 @@ public class OutgoingRecordBatch implements RecordBatch { public void incRecordCount() { ++recordCount; } - - public void flush() throws SchemaChangeException { - if (recordCount == 0) { - // TODO: recordCount of 0 with isLast causes recordLoader to throw an NPE. Probably - // need to send notification rather than an actual batch. - logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : "")); - return; - } + /** + * Send the record batch to the target node, then reset the value vectors + * + * @return true if a flush was needed; otherwise false + * @throws SchemaChangeException + */ + public boolean flush() throws SchemaChangeException { + logger.error("Creating FragmentWritableBatch. IsLast? " + (isLast ? " (last batch)" : "")); final ExecProtos.FragmentHandle handle = context.getHandle(); - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, - handle.getQueryId(), - handle.getMajorFragmentId(), - handle.getMinorFragmentId(), - operator.getOppositeMajorFragmentId(), - 0, - getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, context, writableBatch); - - // reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly - // referenced by generated code and must not be replaced. + + if (recordCount != 0) { + FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + operator.getOppositeMajorFragmentId(), + 0, + getWritableBatch()); + tunnel.sendRecordBatch(statusHandler, context, writableBatch); + } else { + logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); + + if (isLast) { + + // if the last batch is empty, it must not contain any value vectors. + vectorContainer = new VectorContainer(); + + // send final batch + FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + operator.getOppositeMajorFragmentId(), + 0, + getWritableBatch()); + tunnel.sendRecordBatch(statusHandler, context, writableBatch); + return true; + + } + } + + // reset values and reallocate the buffer for each value vector based on the incoming batch. + // NOTE: the value vector is directly referenced by generated code; therefore references + // must remain valid. recordCount = 0; for (VectorWrapper v : vectorContainer) { - logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector()); - getAllocator(v.getValueVector(), - v.getValueVector()).alloc(recordCapacity); + logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush."); + getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount()); } if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); } + return true; } @@ -141,7 +167,6 @@ public class OutgoingRecordBatch implements RecordBatch { recordCapacity = 0; for (VectorWrapper v : vectorContainer) v.getValueVector().clear(); - initializeBatch(); } public void setIsLast() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 293a711..b2ca64e 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -80,12 +80,8 @@ class PartitionSenderRootExec implements RootExec { RecordBatch.IterOutcome out = incoming.next(); logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ - case STOP: case NONE: - // populate outgoing batches - if (incoming.getRecordCount() > 0) - partitioner.partitionBatch(incoming); - + case STOP: try { // send any pending batches for (OutgoingRecordBatch batch : outgoing) { @@ -96,10 +92,7 @@ class PartitionSenderRootExec implements RootExec { incoming.kill(); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); - return false; } - context.batchesCompleted.inc(1); - context.recordsCompleted.inc(incoming.getRecordCount()); return false; case OK_NEW_SCHEMA: @@ -249,8 +242,10 @@ class PartitionSenderRootExec implements RootExec { if (isLastBatch) batch.setIsLast(); batch.flush(); - if (schemaChanged) + if (schemaChanged) { batch.resetBatch(); + batch.initializeBatch(); + } } } }
