Repository: incubator-drill
Updated Branches:
  refs/heads/master 5b61b573e -> 807c4766a


DRILL-1435: ensure producer consumer returns the last batch indicator & prevent 
blocking/leaking producer & consumer threads; fix concurrency issues that 
drives sqlline & drillbit hanging


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/807c4766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/807c4766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/807c4766

Branch: refs/heads/master
Commit: 807c4766ad7acc9c269da49d84d149097a11d1b0
Parents: 5b61b57
Author: Hanifi Gunes <hgu...@maprtech.com>
Authored: Wed Oct 22 15:05:22 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Mon Oct 27 14:22:37 2014 -0700

----------------------------------------------------------------------
 .../impl/producer/ProducerConsumerBatch.java    | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/807c4766/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index fd2878f..3d991ff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.producer;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -153,33 +154,35 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
           IterOutcome upstream = incoming.next();
           switch (upstream) {
             case NONE:
+              stop = true;
               break outer;
             case STOP:
               queue.putFirst(new RecordBatchDataWrapper(null, false, true));
               return;
             case OK_NEW_SCHEMA:
             case OK:
-              try {
-                if (!stop) {
-                  wrapper = new RecordBatchDataWrapper(new 
RecordBatchData(incoming), false, false);
-                  queue.put(wrapper);
-                }
-              } catch (InterruptedException e) {
-                wrapper.batch.getContainer().zeroVectors();
-                throw e;
-              }
+              wrapper = new RecordBatchDataWrapper(new 
RecordBatchData(incoming), false, false);
+              queue.put(wrapper);
+              wrapper = null;
               break;
             default:
               throw new UnsupportedOperationException();
           }
         }
-
-        queue.put(new RecordBatchDataWrapper(null, true, false));
       } catch (InterruptedException e) {
-        if (!(context.isCancelled() || context.isFailed())) {
-          context.fail(e);
-        }
+        logger.warn("Producer thread is interrupted.", e);
       } finally {
+        if (stop) {
+          try {
+            clearQueue();
+            queue.put(new RecordBatchDataWrapper(null, true, false));
+          } catch (InterruptedException e) {
+            logger.error("Unable to enqueue the last batch indicator. 
Something is broken.", e);
+          }
+        }
+        if (wrapper!=null) {
+          wrapper.batch.getContainer().zeroVectors();
+        }
         cleanUpLatch.countDown();
       }
     }

Reply via email to