prevent NPE in recordLoader. still need to handle the last batch correctly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/93121cbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/93121cbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/93121cbf Branch: refs/heads/master Commit: 93121cbf168f63881ad93d126e2fd9306a51f64a Parents: a136a5b Author: Ben Becker <[email protected]> Authored: Sat Aug 10 12:44:59 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 15 18:31:31 2013 -0700 ---------------------------------------------------------------------- .../impl/partitionsender/OutgoingRecordBatch.java | 9 ++++++--- .../impl/partitionsender/PartitionSenderRootExec.java | 11 +++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 6847e5a..6eff778 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -79,9 +79,12 @@ public class OutgoingRecordBatch implements RecordBatch { public void flush() throws SchemaChangeException { if (recordCount == 0) { - logger.warn("Attempted to flush an empty record batch"); + // TODO: recordCount of 0 with isLast causes recordLoader to throw an NPE. Probably + // need to send notification rather than an actual batch. + logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : "")); + return; } - logger.debug("Flushing record batch. count is: " + recordCount + ", capacity is " + recordCapacity); + final ExecProtos.FragmentHandle handle = context.getHandle(); FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, handle.getQueryId(), @@ -90,7 +93,7 @@ public class OutgoingRecordBatch implements RecordBatch { operator.getOppositeMajorFragmentId(), 0, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, context, writableBatch); + tunnel.sendRecordBatch(statusHandler, context, writableBatch); // reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly // referenced by generated code and must not be replaced. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index aa25c96..293a711 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -86,15 +86,20 @@ class PartitionSenderRootExec implements RootExec { if (incoming.getRecordCount() > 0) partitioner.partitionBatch(incoming); - // send all pending batches try { - flushOutgoingBatches(true, false); + // send any pending batches + for (OutgoingRecordBatch batch : outgoing) { + batch.setIsLast(); + batch.flush(); + } } catch (SchemaChangeException e) { incoming.kill(); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); return false; } + context.batchesCompleted.inc(1); + context.recordsCompleted.inc(incoming.getRecordCount()); return false; case OK_NEW_SCHEMA: @@ -111,6 +116,8 @@ class PartitionSenderRootExec implements RootExec { } case OK: partitioner.partitionBatch(incoming); + context.batchesCompleted.inc(1); + context.recordsCompleted.inc(incoming.getRecordCount()); return true; case NOT_YET: default:
