fix NPE in merging receiver

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4cfdb3b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4cfdb3b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4cfdb3b6

Branch: refs/heads/master
Commit: 4cfdb3b653ba4db664abc14c4b1d51e4cec5c668
Parents: 0b1df5d
Author: Steven Phillips <[email protected]>
Authored: Fri Apr 4 02:51:42 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sat Apr 19 18:07:11 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java       | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4cfdb3b6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index c5c77a6..dcfe02f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -236,22 +236,27 @@ public class MergingRecordBatch implements RecordBatch {
 
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
+        RawFragmentBatch nextBatch = null;
         try {
-          incomingBatches[node.batchId] = 
fragProviders[node.batchId].getNext();
+          nextBatch = fragProviders[node.batchId].getNext();
+
+          while (nextBatch != null && 
nextBatch.getHeader().getDef().getRecordCount() == 0) {
+            nextBatch = fragProviders[node.batchId].getNext();
+          }
         } catch (IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
 
-        if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
-            
incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {
+        incomingBatches[node.batchId] = nextBatch;
+
+        if (nextBatch == null) {
           // batch is empty
-          incomingBatches[node.batchId].release();
           boolean allBatchesEmpty = true;
 
           for (RawFragmentBatch batch : incomingBatches) {
             // see if all batches are empty so we can return OK_* or NONE
-            if (!batch.getHeader().getIsLastBatch()) {
+            if (batch != null) {
               allBatchesEmpty = false;
               break;
             }

Reply via email to