fix NPE in merging receiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4cfdb3b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4cfdb3b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4cfdb3b6 Branch: refs/heads/master Commit: 4cfdb3b653ba4db664abc14c4b1d51e4cec5c668 Parents: 0b1df5d Author: Steven Phillips <[email protected]> Authored: Fri Apr 4 02:51:42 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Apr 19 18:07:11 2014 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4cfdb3b6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index c5c77a6..dcfe02f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -236,22 +236,27 @@ public class MergingRecordBatch implements RecordBatch { if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) { // reached the end of an incoming record batch + RawFragmentBatch nextBatch = null; try { - incomingBatches[node.batchId] = fragProviders[node.batchId].getNext(); + nextBatch = fragProviders[node.batchId].getNext(); + + while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) { + nextBatch = fragProviders[node.batchId].getNext(); + } } catch (IOException e) { context.fail(e); return IterOutcome.STOP; } - if (incomingBatches[node.batchId].getHeader().getIsLastBatch() || - incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) { + incomingBatches[node.batchId] = nextBatch; + + if (nextBatch == null) { // batch is empty - incomingBatches[node.batchId].release(); boolean allBatchesEmpty = true; for (RawFragmentBatch batch : incomingBatches) { // see if all batches are empty so we can return OK_* or NONE - if (!batch.getHeader().getIsLastBatch()) { + if (batch != null) { allBatchesEmpty = false; break; }
