Repository: incubator-drill Updated Branches: refs/heads/master 5b61b573e -> 807c4766a
DRILL-1435: ensure producer consumer returns the last batch indicator & prevent blocking/leaking producer & consumer threads; fix concurrency issues that drives sqlline & drillbit hanging Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/807c4766 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/807c4766 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/807c4766 Branch: refs/heads/master Commit: 807c4766ad7acc9c269da49d84d149097a11d1b0 Parents: 5b61b57 Author: Hanifi Gunes <hgu...@maprtech.com> Authored: Wed Oct 22 15:05:22 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Mon Oct 27 14:22:37 2014 -0700 ---------------------------------------------------------------------- .../impl/producer/ProducerConsumerBatch.java | 31 +++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/807c4766/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index fd2878f..3d991ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.producer; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; @@ -153,33 +154,35 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { IterOutcome upstream = incoming.next(); switch (upstream) { case NONE: + stop = true; break outer; case STOP: queue.putFirst(new RecordBatchDataWrapper(null, false, true)); return; case OK_NEW_SCHEMA: case OK: - try { - if (!stop) { - wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false); - queue.put(wrapper); - } - } catch (InterruptedException e) { - wrapper.batch.getContainer().zeroVectors(); - throw e; - } + wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false); + queue.put(wrapper); + wrapper = null; break; default: throw new UnsupportedOperationException(); } } - - queue.put(new RecordBatchDataWrapper(null, true, false)); } catch (InterruptedException e) { - if (!(context.isCancelled() || context.isFailed())) { - context.fail(e); - } + logger.warn("Producer thread is interrupted.", e); } finally { + if (stop) { + try { + clearQueue(); + queue.put(new RecordBatchDataWrapper(null, true, false)); + } catch (InterruptedException e) { + logger.error("Unable to enqueue the last batch indicator. Something is broken.", e); + } + } + if (wrapper!=null) { + wrapper.batch.getContainer().zeroVectors(); + } cleanUpLatch.countDown(); } }