DRILL-1202: fixes memory leak issues: i) ProducerConsumerBatch should clean up 
resources ii) FragmentExecutor should clean up gracefully at faulty & 
non-faulty runs regardless


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28fbdad8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28fbdad8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28fbdad8

Branch: refs/heads/master
Commit: 28fbdad886583bc7193833abb06deea327ec8fbe
Parents: 90c4ef6
Author: Hanifi Gunes <hgu...@maprtech.com>
Authored: Tue Jul 29 18:39:11 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Aug 6 16:44:22 2014 -0700

----------------------------------------------------------------------
 .../impl/producer/ProducerConsumerBatch.java    | 75 ++++++++++----------
 .../exec/work/fragment/FragmentExecutor.java    | 68 ++++++------------
 2 files changed, 58 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28fbdad8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index f091aa9..91d3647 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -40,6 +40,7 @@ import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 
 public class ProducerConsumerBatch extends AbstractRecordBatch {
@@ -52,6 +53,7 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
   private int recordCount;
   private BatchSchema schema;
   private boolean stop = false;
+  private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used 
to wait producer to clean up
 
   protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -124,50 +126,40 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
     @Override
     public void run() {
       try {
-      if (stop) return;
-      outer: while (true) {
-        IterOutcome upstream = incoming.next();
-        switch (upstream) {
-          case NONE:
-            break outer;
-          case STOP:
-            try {
+        if (stop) return;
+        outer:
+        while (true) {
+          IterOutcome upstream = incoming.next();
+          switch (upstream) {
+            case NONE:
+              break outer;
+            case STOP:
               queue.putFirst(new RecordBatchDataWrapper(null, false, true));
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-            return;
-          case OK_NEW_SCHEMA:
-          case OK:
-            try {
-              if (!stop) {
-                wrapper = new RecordBatchDataWrapper(new 
RecordBatchData(incoming), false, false);
-                queue.put(wrapper);
-              }
-            } catch (InterruptedException e) {
-              if (!(context.isCancelled() || context.isFailed())) {
-                context.fail(e);
+              return;
+            case OK_NEW_SCHEMA:
+            case OK:
+              try {
+                if (!stop) {
+                  wrapper = new RecordBatchDataWrapper(new 
RecordBatchData(incoming), false, false);
+                  queue.put(wrapper);
+                }
+              } catch (InterruptedException e) {
+                wrapper.batch.getContainer().zeroVectors();
+                throw e;
               }
-              wrapper.batch.getContainer().zeroVectors();
-              incoming.cleanup();
-              break outer;
-            }
-            break;
-          default:
-            throw new UnsupportedOperationException();
+              break;
+            default:
+              throw new UnsupportedOperationException();
+          }
         }
-      }
-      try {
+
         queue.put(new RecordBatchDataWrapper(null, true, false));
       } catch (InterruptedException e) {
         if (!(context.isCancelled() || context.isFailed())) {
           context.fail(e);
         }
-
-      }
       } finally {
-        incoming.cleanup();
-        logger.debug("Producer thread finished");
+        cleanUpLatch.countDown();
       }
     }
   }
@@ -183,8 +175,8 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
-    producer.interrupt();
     stop = true;
+    producer.interrupt();
     try {
       producer.join();
     } catch (InterruptedException e) {
@@ -195,8 +187,15 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
   @Override
   public void cleanup() {
     stop = true;
-    clearQueue();
-    super.cleanup();
+    try {
+      cleanUpLatch.await();
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted while waiting for producer to clean up first. I 
will try to clean up now...", e);
+    } finally {
+      super.cleanup();
+      clearQueue();
+      incoming.cleanup();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28fbdad8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index c5c08e2..e4941d0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -82,8 +82,8 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
 
   @Override
   public void run() {
+    final String originalThread = Thread.currentThread().getName();
     try {
-      final String originalThread = Thread.currentThread().getName();
       String newThreadName = String.format("%s:frag:%s:%s", //
           QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
           context.getHandle().getMajorFragmentId(),
@@ -92,64 +92,38 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
       Thread.currentThread().setName(newThreadName);
       executionThread = Thread.currentThread();
 
-      boolean closed = false;
-      try {
-        root = ImplCreator.getExec(context, rootOperator);
-      } catch (AssertionError | Exception e) {
-        context.fail(e);
-        logger.debug("Failure while initializing operator tree", e);
-        internalFail(e);
-        return;
-      }
+      root = ImplCreator.getExec(context, rootOperator);
 
       logger.debug("Starting fragment runner. {}:{}", 
context.getHandle().getMajorFragmentId(), 
context.getHandle().getMinorFragmentId());
-      if(!updateState(FragmentState.AWAITING_ALLOCATION, 
FragmentState.RUNNING, false)){
+      if (!updateState(FragmentState.AWAITING_ALLOCATION, 
FragmentState.RUNNING, false)) {
         internalFail(new RuntimeException(String.format("Run was called when 
fragment was in %s state.  FragmentRunnables should only be started when they 
are currently in awaiting allocation state.", 
FragmentState.valueOf(state.get()))));
         return;
       }
 
-
-
       // run the query until root.next returns false.
-      try{
-        while(state.get() == FragmentState.RUNNING_VALUE){
-          if(!root.next()){
-            if(context.isFailed()){
-              updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
-            }else{
-              updateState(FragmentState.RUNNING, FragmentState.FINISHED, 
false);
-            }
-
-          }
-        }
-
-        root.stop();
-        if(context.isFailed()) {
-          internalFail(context.getFailureCause());
-        }
-
-        closed = true;
-
-        context.close();
-      }catch(AssertionError | Exception ex){
-        logger.debug("Caught exception while running fragment", ex);
-        internalFail(ex);
-      }finally{
-        Thread.currentThread().setName(originalThread);
-        if(!closed) {
-          try {
-            if(context.isFailed()) {
-              internalFail(context.getFailureCause());
-            }
-            context.close();
-          } catch (RuntimeException e) {
-            logger.warn("Failure while closing context in failed state.", e);
+      while (state.get() == FragmentState.RUNNING_VALUE) {
+        if (!root.next()) {
+          if (context.isFailed()){
+            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+          } else {
+            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
           }
         }
       }
+    } catch (AssertionError | Exception e) {
+      logger.debug("Failure while initializing operator tree", e);
+      context.fail(e);
+      internalFail(e);
     } finally {
-      logger.debug("Fragment runner complete. {}:{}", 
context.getHandle().getMajorFragmentId(), 
context.getHandle().getMinorFragmentId());
       bee.removeFragment(context.getHandle());
+      if (context.isFailed()) {
+        internalFail(context.getFailureCause());
+      }
+      root.stop(); // stop root executor & clean-up resources
+      context.close();
+
+      logger.debug("Fragment runner complete. {}:{}", 
context.getHandle().getMajorFragmentId(), 
context.getHandle().getMinorFragmentId());
+      Thread.currentThread().setName(originalThread);
     }
   }
 

Reply via email to