Repository: tajo
Updated Branches:
  refs/heads/master b05ade6a6 -> ccca8c687


TAJO-1875: Resource leak after a query failure.

Closes #815


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ccca8c68
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ccca8c68
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ccca8c68

Branch: refs/heads/master
Commit: ccca8c6876e6b352522fba50fa834eae731a2adb
Parents: b05ade6
Author: Jihoon Son <[email protected]>
Authored: Thu Oct 8 18:44:44 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Thu Oct 8 18:44:44 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/worker/MockTaskManager.java | 13 +++++++
 .../tajo/worker/TestNodeResourceManager.java    | 39 +++++++++++++++++++-
 .../org/apache/tajo/worker/TaskExecutor.java    | 11 +++++-
 .../org/apache/tajo/worker/TaskManager.java     |  8 ++--
 5 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ccca8c68/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 295b5ca..b55ed83 100644
--- a/CHANGES
+++ b/CHANGES
@@ -333,6 +333,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1875: Resource leak after a query failure. (jihoon)
+
     TAJO-1903: Insert clause occassionally fails on S3. (jinho)
 
     TAJO-1912: Selection from aliased schemaless tables throws an error. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ccca8c68/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 136fe04..5979bbb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -29,18 +29,31 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
 
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 public class MockTaskManager extends TaskManager {
 
   private final Semaphore barrier;
+  private boolean testEbCreateFailure = false;
 
   public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, 
TajoWorker.WorkerContext workerContext) {
     super(dispatcher, workerContext);
     this.barrier = barrier;
   }
 
+  public void enableEbCreateFailure() {
+    testEbCreateFailure = true;
+  }
+
+  public void disableEbCreateFailure() {
+    testEbCreateFailure = true;
+  }
+
   @Override
   protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId 
executionBlockId, String queryMaster) {
+    if (testEbCreateFailure) {
+      throw new RuntimeException("Failure for test");
+    }
     try {
       ExecutionBlockContextResponse.Builder builder = 
ExecutionBlockContextResponse.newBuilder();
       builder.setExecutionBlockId(executionBlockId.getProto())

http://git-wip-us.apache.org/repos/asf/tajo/blob/ccca8c68/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 212f24a..3ddae7e 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -32,7 +32,9 @@ import 
org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
 import org.apache.tajo.worker.event.NodeResourceEvent;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import java.util.List;
 import java.util.Queue;
@@ -43,6 +45,9 @@ import static org.apache.tajo.ResourceProtos.*;
 import static org.junit.Assert.*;
 public class TestNodeResourceManager {
 
+  @Rule
+  public TestName name = new TestName();
+
   private MockNodeResourceManager resourceManager;
   private NodeStatusUpdater statusUpdater;
   private TaskManager taskManager;
@@ -55,6 +60,14 @@ public class TestNodeResourceManager {
   private int taskMemory;
   private TajoConf conf;
 
+  private static boolean enableEbCreateFailure(String testName) {
+    if (testName.equals("testResourceDeallocateWithEbCreateFailure")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   @Before
   public void setup() {
     conf = new TajoConf();
@@ -107,6 +120,10 @@ public class TestNodeResourceManager {
     resourceManager = new MockNodeResourceManager(new Semaphore(0), 
dispatcher, workerContext);
     statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), 
workerContext);
 
+    if (enableEbCreateFailure(name.getMethodName())) {
+      ((MockTaskManager)taskManager).enableEbCreateFailure();
+    }
+
     service = new CompositeService("MockService") {
       @Override
       protected void serviceInit(Configuration conf) throws Exception {
@@ -235,7 +252,6 @@ public class TestNodeResourceManager {
 
     List<Future> futureList = Lists.newArrayList();
 
-    long startTime = System.currentTimeMillis();
     for (int i = 0; i < parallelCount; i++) {
       futureList.add(executor.submit(new Runnable() {
             @Override
@@ -277,4 +293,25 @@ public class TestNodeResourceManager {
     executor.shutdown();
     assertEquals(taskSize, totalComplete.get());
   }
+
+  @Test
+  public void testResourceDeallocateWithEbCreateFailure() throws Exception {
+    final int taskSize = 10;
+    resourceManager.setTaskHandlerEvent(true);
+
+    final ExecutionBlockId ebId = new 
ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    final Queue<TaskAllocationProto>
+        totalTasks = MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, taskSize);
+
+    TaskAllocationProto task = totalTasks.poll();
+    BatchAllocationRequest.Builder requestProto = 
BatchAllocationRequest.newBuilder();
+    requestProto.addTaskRequest(task);
+    requestProto.setExecutionBlockId(ebId.getProto());
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<>();
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+
+    Thread.sleep(2000); // wait for resource deallocation
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ccca8c68/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 8b9d43f..57f3cd9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -138,11 +138,18 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
     NodeResource resource =  allocatedResourceMap.remove(taskId);
 
     if(resource != null) {
-      
workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
-          new NodeResourceDeallocateEvent(resource, 
NodeResourceEvent.ResourceType.TASK));
+      releaseResource(resource);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Task resource " + taskId + " is released. (" + resource + 
")");
+      }
     }
   }
 
+  protected void releaseResource(NodeResource resource) {
+    
workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
+        new NodeResourceDeallocateEvent(resource, 
NodeResourceEvent.ResourceType.TASK));
+  }
+
   protected Task createTask(ExecutionBlockContext executionBlockContext,
                             TaskRequestProto taskRequest) throws IOException {
     Task task = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ccca8c68/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index e5cf7c9..18e9762 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -185,12 +185,14 @@ public class TaskManager extends AbstractService 
implements EventHandler<TaskMan
                 + ", running tasks:" + getRunningTasks() + ", 
availableResource: "
                 + 
workerContext.getNodeResourceManager().getAvailableResource());
           }
-          getTaskExecutor().handle(taskStartEvent);
-        } catch (Exception e) {
-          getTaskExecutor().releaseResource(taskStartEvent.getTaskAttemptId());
+        } catch (Throwable e) {
+          LOG.fatal(e.getMessage(), e);
+          
getTaskExecutor().releaseResource(taskStartEvent.getAllocatedResource());
           getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
               .handle(new 
ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e));
+          break;
         }
+        getTaskExecutor().handle(taskStartEvent);
         break;
       }
       case EB_STOP: {

Reply via email to