Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 7d0bfe00a -> 2213864e7


TAJO-1875: Resource leak after a query failure.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2213864e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2213864e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2213864e

Branch: refs/heads/branch-0.11.0
Commit: 2213864e7d49d9cebc8f033a0c41c45efc2e05d2
Parents: 7d0bfe0
Author: Jihoon Son <[email protected]>
Authored: Thu Oct 8 18:48:01 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Thu Oct 8 18:48:01 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/worker/MockTaskManager.java | 13 +++++++
 .../tajo/worker/TestNodeResourceManager.java    | 39 +++++++++++++++++++-
 .../org/apache/tajo/worker/TaskExecutor.java    | 11 +++++-
 .../org/apache/tajo/worker/TaskManager.java     |  8 ++--
 5 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6537676..258da54 100644
--- a/CHANGES
+++ b/CHANGES
@@ -285,6 +285,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1875: Resource leak after a query failure. (jihoon)
+
     TAJO-1903: Insert clause occassionally fails on S3. (jinho)
 
     TAJO-1912: Selection from aliased schemaless tables throws an error.

http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 76ce9f7..108b0d3 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -30,18 +30,31 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
 
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 public class MockTaskManager extends TaskManager {
 
   private final Semaphore barrier;
+  private boolean testEbCreateFailure = false;
 
   public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, 
TajoWorker.WorkerContext workerContext) {
     super(dispatcher, workerContext);
     this.barrier = barrier;
   }
 
+  public void enableEbCreateFailure() {
+    testEbCreateFailure = true;
+  }
+
+  public void disableEbCreateFailure() {
+    testEbCreateFailure = true;
+  }
+
   @Override
   protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId 
executionBlockId, String queryMaster) {
+    if (testEbCreateFailure) {
+      throw new RuntimeException("Failure for test");
+    }
     try {
       ExecutionBlockContextResponse.Builder builder = 
ExecutionBlockContextResponse.newBuilder();
       builder.setExecutionBlockId(executionBlockId.getProto())

http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index b1546b9..98d2caa 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -33,7 +33,9 @@ import 
org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
 import org.apache.tajo.worker.event.NodeResourceEvent;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import java.util.List;
 import java.util.Queue;
@@ -44,6 +46,9 @@ import static org.apache.tajo.ResourceProtos.*;
 import static org.junit.Assert.*;
 public class TestNodeResourceManager {
 
+  @Rule
+  public TestName name = new TestName();
+
   private MockNodeResourceManager resourceManager;
   private NodeStatusUpdater statusUpdater;
   private TaskManager taskManager;
@@ -56,6 +61,14 @@ public class TestNodeResourceManager {
   private int taskMemory;
   private TajoConf conf;
 
+  private static boolean enableEbCreateFailure(String testName) {
+    if (testName.equals("testResourceDeallocateWithEbCreateFailure")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   @Before
   public void setup() {
     conf = new TajoConf();
@@ -108,6 +121,10 @@ public class TestNodeResourceManager {
     resourceManager = new MockNodeResourceManager(new Semaphore(0), 
dispatcher, workerContext);
     statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), 
workerContext);
 
+    if (enableEbCreateFailure(name.getMethodName())) {
+      ((MockTaskManager)taskManager).enableEbCreateFailure();
+    }
+
     service = new CompositeService("MockService") {
       @Override
       protected void serviceInit(Configuration conf) throws Exception {
@@ -236,7 +253,6 @@ public class TestNodeResourceManager {
 
     List<Future> futureList = Lists.newArrayList();
 
-    long startTime = System.currentTimeMillis();
     for (int i = 0; i < parallelCount; i++) {
       futureList.add(executor.submit(new Runnable() {
             @Override
@@ -278,4 +294,25 @@ public class TestNodeResourceManager {
     executor.shutdown();
     assertEquals(taskSize, totalComplete.get());
   }
+
+  @Test
+  public void testResourceDeallocateWithEbCreateFailure() throws Exception {
+    final int taskSize = 10;
+    resourceManager.setTaskHandlerEvent(true);
+
+    final ExecutionBlockId ebId = new 
ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    final Queue<TaskAllocationProto>
+        totalTasks = MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, taskSize);
+
+    TaskAllocationProto task = totalTasks.poll();
+    BatchAllocationRequest.Builder requestProto = 
BatchAllocationRequest.newBuilder();
+    requestProto.addTaskRequest(task);
+    requestProto.setExecutionBlockId(ebId.getProto());
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<>();
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+
+    Thread.sleep(2000); // wait for resource deallocation
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 1d6e2b8..fdd7da9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -138,11 +138,18 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
     NodeResource resource =  allocatedResourceMap.remove(taskId);
 
     if(resource != null) {
-      
workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
-          new NodeResourceDeallocateEvent(resource, 
NodeResourceEvent.ResourceType.TASK));
+      releaseResource(resource);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Task resource " + taskId + " is released. (" + resource + 
")");
+      }
     }
   }
 
+  protected void releaseResource(NodeResource resource) {
+    
workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
+        new NodeResourceDeallocateEvent(resource, 
NodeResourceEvent.ResourceType.TASK));
+  }
+
   protected Task createTask(ExecutionBlockContext executionBlockContext,
                             TaskRequestProto taskRequest) throws IOException {
     Task task = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index a0b3f97..468a44a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -185,12 +185,14 @@ public class TaskManager extends AbstractService 
implements EventHandler<TaskMan
                 + ", running tasks:" + getRunningTasks() + ", 
availableResource: "
                 + 
workerContext.getNodeResourceManager().getAvailableResource());
           }
-          getTaskExecutor().handle(taskStartEvent);
-        } catch (Exception e) {
-          getTaskExecutor().releaseResource(taskStartEvent.getTaskAttemptId());
+        } catch (Throwable e) {
+          LOG.fatal(e.getMessage(), e);
+          
getTaskExecutor().releaseResource(taskStartEvent.getAllocatedResource());
           getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
               .handle(new 
ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e));
+          break;
         }
+        getTaskExecutor().handle(taskStartEvent);
         break;
       }
       case EB_STOP: {

Reply via email to