DRILL-1202: fixes memory leak issues: i) ProducerConsumerBatch should clean up resources ii) FragmentExecutor should clean up gracefully at faulty & non-faulty runs regardless
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28fbdad8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28fbdad8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28fbdad8 Branch: refs/heads/master Commit: 28fbdad886583bc7193833abb06deea327ec8fbe Parents: 90c4ef6 Author: Hanifi Gunes <hgu...@maprtech.com> Authored: Tue Jul 29 18:39:11 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Aug 6 16:44:22 2014 -0700 ---------------------------------------------------------------------- .../impl/producer/ProducerConsumerBatch.java | 75 ++++++++++---------- .../exec/work/fragment/FragmentExecutor.java | 68 ++++++------------ 2 files changed, 58 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28fbdad8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index f091aa9..91d3647 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -40,6 +40,7 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerBatch extends AbstractRecordBatch { @@ -52,6 +53,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { private int recordCount; private BatchSchema schema; private boolean stop = false; + private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); @@ -124,50 +126,40 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override public void run() { try { - if (stop) return; - outer: while (true) { - IterOutcome upstream = incoming.next(); - switch (upstream) { - case NONE: - break outer; - case STOP: - try { + if (stop) return; + outer: + while (true) { + IterOutcome upstream = incoming.next(); + switch (upstream) { + case NONE: + break outer; + case STOP: queue.putFirst(new RecordBatchDataWrapper(null, false, true)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return; - case OK_NEW_SCHEMA: - case OK: - try { - if (!stop) { - wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false); - queue.put(wrapper); - } - } catch (InterruptedException e) { - if (!(context.isCancelled() || context.isFailed())) { - context.fail(e); + return; + case OK_NEW_SCHEMA: + case OK: + try { + if (!stop) { + wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false); + queue.put(wrapper); + } + } catch (InterruptedException e) { + wrapper.batch.getContainer().zeroVectors(); + throw e; } - wrapper.batch.getContainer().zeroVectors(); - incoming.cleanup(); - break outer; - } - break; - default: - throw new UnsupportedOperationException(); + break; + default: + throw new UnsupportedOperationException(); + } } - } - try { + queue.put(new RecordBatchDataWrapper(null, true, false)); } catch (InterruptedException e) { if (!(context.isCancelled() || context.isFailed())) { context.fail(e); } - - } } finally { - incoming.cleanup(); - logger.debug("Producer thread finished"); + cleanUpLatch.countDown(); } } } @@ -183,8 +175,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override protected void killIncoming(boolean sendUpstream) { - producer.interrupt(); stop = true; + producer.interrupt(); try { producer.join(); } catch (InterruptedException e) { @@ -195,8 +187,15 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override public void cleanup() { stop = true; - clearQueue(); - super.cleanup(); + try { + cleanUpLatch.await(); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e); + } finally { + super.cleanup(); + clearQueue(); + incoming.cleanup(); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28fbdad8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index c5c08e2..e4941d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -82,8 +82,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid @Override public void run() { + final String originalThread = Thread.currentThread().getName(); try { - final String originalThread = Thread.currentThread().getName(); String newThreadName = String.format("%s:frag:%s:%s", // QueryIdHelper.getQueryId(context.getHandle().getQueryId()), // context.getHandle().getMajorFragmentId(), @@ -92,64 +92,38 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid Thread.currentThread().setName(newThreadName); executionThread = Thread.currentThread(); - boolean closed = false; - try { - root = ImplCreator.getExec(context, rootOperator); - } catch (AssertionError | Exception e) { - context.fail(e); - logger.debug("Failure while initializing operator tree", e); - internalFail(e); - return; - } + root = ImplCreator.getExec(context, rootOperator); logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); - if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){ + if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) { internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); return; } - - // run the query until root.next returns false. - try{ - while(state.get() == FragmentState.RUNNING_VALUE){ - if(!root.next()){ - if(context.isFailed()){ - updateState(FragmentState.RUNNING, FragmentState.FAILED, false); - }else{ - updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); - } - - } - } - - root.stop(); - if(context.isFailed()) { - internalFail(context.getFailureCause()); - } - - closed = true; - - context.close(); - }catch(AssertionError | Exception ex){ - logger.debug("Caught exception while running fragment", ex); - internalFail(ex); - }finally{ - Thread.currentThread().setName(originalThread); - if(!closed) { - try { - if(context.isFailed()) { - internalFail(context.getFailureCause()); - } - context.close(); - } catch (RuntimeException e) { - logger.warn("Failure while closing context in failed state.", e); + while (state.get() == FragmentState.RUNNING_VALUE) { + if (!root.next()) { + if (context.isFailed()){ + updateState(FragmentState.RUNNING, FragmentState.FAILED, false); + } else { + updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); } } } + } catch (AssertionError | Exception e) { + logger.debug("Failure while initializing operator tree", e); + context.fail(e); + internalFail(e); } finally { - logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); bee.removeFragment(context.getHandle()); + if (context.isFailed()) { + internalFail(context.getFailureCause()); + } + root.stop(); // stop root executor & clean-up resources + context.close(); + + logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); + Thread.currentThread().setName(originalThread); } }