Repository: tajo Updated Branches: refs/heads/branch-0.11.0 7d0bfe00a -> 2213864e7
TAJO-1875: Resource leak after a query failure. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2213864e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2213864e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2213864e Branch: refs/heads/branch-0.11.0 Commit: 2213864e7d49d9cebc8f033a0c41c45efc2e05d2 Parents: 7d0bfe0 Author: Jihoon Son <[email protected]> Authored: Thu Oct 8 18:48:01 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Oct 8 18:48:01 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/worker/MockTaskManager.java | 13 +++++++ .../tajo/worker/TestNodeResourceManager.java | 39 +++++++++++++++++++- .../org/apache/tajo/worker/TaskExecutor.java | 11 +++++- .../org/apache/tajo/worker/TaskManager.java | 8 ++-- 5 files changed, 67 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6537676..258da54 100644 --- a/CHANGES +++ b/CHANGES @@ -285,6 +285,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1875: Resource leak after a query failure. (jihoon) + TAJO-1903: Insert clause occassionally fails on S3. (jinho) TAJO-1912: Selection from aliased schemaless tables throws an error. http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java index 76ce9f7..108b0d3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java @@ -30,18 +30,31 @@ import org.apache.tajo.worker.event.TaskManagerEvent; import java.io.IOException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; public class MockTaskManager extends TaskManager { private final Semaphore barrier; + private boolean testEbCreateFailure = false; public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) { super(dispatcher, workerContext); this.barrier = barrier; } + public void enableEbCreateFailure() { + testEbCreateFailure = true; + } + + public void disableEbCreateFailure() { + testEbCreateFailure = true; + } + @Override protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, String queryMaster) { + if (testEbCreateFailure) { + throw new RuntimeException("Failure for test"); + } try { ExecutionBlockContextResponse.Builder builder = ExecutionBlockContextResponse.newBuilder(); builder.setExecutionBlockId(executionBlockId.getProto()) http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java index b1546b9..98d2caa 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -33,7 +33,9 @@ import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; import org.apache.tajo.worker.event.NodeResourceEvent; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import java.util.List; import java.util.Queue; @@ -44,6 +46,9 @@ import static org.apache.tajo.ResourceProtos.*; import static org.junit.Assert.*; public class TestNodeResourceManager { + @Rule + public TestName name = new TestName(); + private MockNodeResourceManager resourceManager; private NodeStatusUpdater statusUpdater; private TaskManager taskManager; @@ -56,6 +61,14 @@ public class TestNodeResourceManager { private int taskMemory; private TajoConf conf; + private static boolean enableEbCreateFailure(String testName) { + if (testName.equals("testResourceDeallocateWithEbCreateFailure")) { + return true; + } else { + return false; + } + } + @Before public void setup() { conf = new TajoConf(); @@ -108,6 +121,10 @@ public class TestNodeResourceManager { resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, workerContext); statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext); + if (enableEbCreateFailure(name.getMethodName())) { + ((MockTaskManager)taskManager).enableEbCreateFailure(); + } + service = new CompositeService("MockService") { @Override protected void serviceInit(Configuration conf) throws Exception { @@ -236,7 +253,6 @@ public class TestNodeResourceManager { List<Future> futureList = Lists.newArrayList(); - long startTime = System.currentTimeMillis(); for (int i = 0; i < parallelCount; i++) { futureList.add(executor.submit(new Runnable() { @Override @@ -278,4 +294,25 @@ public class TestNodeResourceManager { executor.shutdown(); assertEquals(taskSize, totalComplete.get()); } + + @Test + public void testResourceDeallocateWithEbCreateFailure() throws Exception { + final int taskSize = 10; + resourceManager.setTaskHandlerEvent(true); + + final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + final Queue<TaskAllocationProto> + totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize); + + TaskAllocationProto task = totalTasks.poll(); + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + assertTrue(callFuture.get().getCancellationTaskCount() == 0); + + Thread.sleep(2000); // wait for resource deallocation + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index 1d6e2b8..fdd7da9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -138,11 +138,18 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt NodeResource resource = allocatedResourceMap.remove(taskId); if(resource != null) { - workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle( - new NodeResourceDeallocateEvent(resource, NodeResourceEvent.ResourceType.TASK)); + releaseResource(resource); + if (LOG.isDebugEnabled()) { + LOG.debug("Task resource " + taskId + " is released. (" + resource + ")"); + } } } + protected void releaseResource(NodeResource resource) { + workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle( + new NodeResourceDeallocateEvent(resource, NodeResourceEvent.ResourceType.TASK)); + } + protected Task createTask(ExecutionBlockContext executionBlockContext, TaskRequestProto taskRequest) throws IOException { Task task = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/2213864e/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index a0b3f97..468a44a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -185,12 +185,14 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan + ", running tasks:" + getRunningTasks() + ", availableResource: " + workerContext.getNodeResourceManager().getAvailableResource()); } - getTaskExecutor().handle(taskStartEvent); - } catch (Exception e) { - getTaskExecutor().releaseResource(taskStartEvent.getTaskAttemptId()); + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + getTaskExecutor().releaseResource(taskStartEvent.getAllocatedResource()); getWorkerContext().getTaskManager().getDispatcher().getEventHandler() .handle(new ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e)); + break; } + getTaskExecutor().handle(taskStartEvent); break; } case EB_STOP: {
