Repository: tez Updated Branches: refs/heads/master a1f2da8eb -> 022df7218
TEZ-3893. Tez Local Mode can hang for cases. (Jonathan Eagles via jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/022df721 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/022df721 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/022df721 Branch: refs/heads/master Commit: 022df7218afbb2c940ddc4447246dea5a546c759 Parents: a1f2da8 Author: Jason Lowe <[email protected]> Authored: Wed Feb 14 10:14:34 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Wed Feb 14 10:14:34 2018 -0600 ---------------------------------------------------------------------- .../dag/app/rm/LocalTaskSchedulerService.java | 87 ++++++++++---------- .../tez/dag/app/rm/TestLocalTaskScheduler.java | 13 +-- .../app/rm/TestLocalTaskSchedulerService.java | 35 ++++---- 3 files changed, 69 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 3b034cd..04e79a8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -19,8 +19,8 @@ package org.apache.tez.dag.app.rm; import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.HashMap; import java.util.Iterator; @@ -51,7 +51,7 @@ public class LocalTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(LocalTaskSchedulerService.class); final ContainerSignatureMatcher containerSignatureMatcher; - final PriorityBlockingQueue<TaskRequest> taskRequestQueue; + final LinkedBlockingQueue<TaskRequest> taskRequestQueue; final Configuration conf; AsyncDelegateRequestHandler taskRequestHandler; Thread asyncDelegateRequestThread; @@ -62,7 +62,7 @@ public class LocalTaskSchedulerService extends TaskScheduler { public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { super(taskSchedulerContext); - taskRequestQueue = new PriorityBlockingQueue<TaskRequest>(); + taskRequestQueue = new LinkedBlockingQueue<>(); taskAllocations = new LinkedHashMap<Object, Container>(); this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); @@ -313,29 +313,31 @@ public class LocalTaskSchedulerService extends TaskScheduler { } static class AsyncDelegateRequestHandler implements Runnable { - final BlockingQueue<TaskRequest> taskRequestQueue; + final LinkedBlockingQueue<TaskRequest> clientRequestQueue; + final PriorityBlockingQueue<AllocateTaskRequest> taskRequestQueue; final LocalContainerFactory localContainerFactory; final HashMap<Object, Container> taskAllocations; final TaskSchedulerContext taskSchedulerContext; final int MAX_TASKS; - AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue, + AsyncDelegateRequestHandler(LinkedBlockingQueue<TaskRequest> clientRequestQueue, LocalContainerFactory localContainerFactory, HashMap<Object, Container> taskAllocations, TaskSchedulerContext taskSchedulerContext, Configuration conf) { - this.taskRequestQueue = taskRequestQueue; + this.clientRequestQueue = clientRequestQueue; this.localContainerFactory = localContainerFactory; this.taskAllocations = taskAllocations; this.taskSchedulerContext = taskSchedulerContext; this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); + this.taskRequestQueue = new PriorityBlockingQueue<>(); } public void addAllocateTaskRequest(Object task, Resource capability, Priority priority, Object clientCookie) { try { - taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie)); + clientRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -343,57 +345,54 @@ public class LocalTaskSchedulerService extends TaskScheduler { public boolean addDeallocateTaskRequest(Object task) { try { - taskRequestQueue.put(new DeallocateTaskRequest(task)); + clientRequestQueue.put(new DeallocateTaskRequest(task)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - synchronized(taskRequestQueue) { - taskRequestQueue.notify(); - } return true; } - boolean shouldWait() { - return taskAllocations.size() >= MAX_TASKS; + boolean shouldProcess() { + return !taskRequestQueue.isEmpty() && taskAllocations.size() < MAX_TASKS; } @Override public void run() { - while(!Thread.currentThread().isInterrupted()) { - synchronized(taskRequestQueue) { - try { - if (shouldWait()) { - taskRequestQueue.wait(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + while (!Thread.currentThread().isInterrupted()) { + dispatchRequest(); + while (shouldProcess()) { + allocateTask(); } - processRequest(); } } - void processRequest() { - try { - TaskRequest request = taskRequestQueue.take(); - if (request instanceof AllocateTaskRequest) { - allocateTask((AllocateTaskRequest)request); - } - else if (request instanceof DeallocateTaskRequest) { - deallocateTask((DeallocateTaskRequest)request); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (NullPointerException e) { - LOG.warn("Task request was badly constructed"); + void dispatchRequest() { + try { + TaskRequest request = clientRequestQueue.take(); + if (request instanceof AllocateTaskRequest) { + taskRequestQueue.put((AllocateTaskRequest)request); + } + else if (request instanceof DeallocateTaskRequest) { + deallocateTask((DeallocateTaskRequest)request); + } + else { + LOG.error("Unknown task request message: " + request); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - void allocateTask(AllocateTaskRequest request) { - Container container = localContainerFactory.createContainer(request.capability, - request.priority); - taskAllocations.put(request.task, container); - taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container); + void allocateTask() { + try { + AllocateTaskRequest request = taskRequestQueue.take(); + Container container = localContainerFactory.createContainer(request.capability, + request.priority); + taskAllocations.put(request.task, container); + taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } void deallocateTask(DeallocateTaskRequest request) { @@ -403,13 +402,13 @@ public class LocalTaskSchedulerService extends TaskScheduler { } else { boolean deallocationBeforeAllocation = false; - Iterator<TaskRequest> iter = taskRequestQueue.iterator(); + Iterator<AllocateTaskRequest> iter = taskRequestQueue.iterator(); while (iter.hasNext()) { TaskRequest taskRequest = iter.next(); - if (taskRequest instanceof AllocateTaskRequest && taskRequest.task.equals(request.task)) { + if (taskRequest.task.equals(request.task)) { iter.remove(); deallocationBeforeAllocation = true; - LOG.info("deallcation happen before allocation for task:" + request.task); + LOG.info("Deallocation request before allocation for task:" + request.task); break; } } http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java index 2ada2f1..36505c2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java @@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -57,11 +57,11 @@ public class TestLocalTaskScheduler { LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000); HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>(); - PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>(); + LinkedBlockingQueue<TaskRequest> clientRequestQueue = new LinkedBlockingQueue<>(); // Object under test AsyncDelegateRequestHandler requestHandler = - new AsyncDelegateRequestHandler(taskRequestQueue, + new AsyncDelegateRequestHandler(clientRequestQueue, containerFactory, taskAllocations, mockContext, @@ -71,17 +71,18 @@ public class TestLocalTaskScheduler { for (int i = 0; i < MAX_TASKS; i++) { Priority priority = Priority.newInstance(20); requestHandler.addAllocateTaskRequest(new Long(i), null, priority, null); - requestHandler.processRequest(); + requestHandler.dispatchRequest(); + requestHandler.allocateTask(); } // Only MAX_TASKS number of tasks should have been allocated Assert.assertEquals("Wrong number of allocate tasks", MAX_TASKS, taskAllocations.size()); - Assert.assertTrue("Another allocation should not fit", requestHandler.shouldWait()); + Assert.assertTrue("Another allocation should not fit", !requestHandler.shouldProcess()); // Deallocate down to zero for (int i = 0; i < MAX_TASKS; i++) { requestHandler.addDeallocateTaskRequest(new Long(i)); - requestHandler.processRequest(); + requestHandler.dispatchRequest(); } // All allocated tasks should have been removed http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index 3b2de34..c2daf84 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -19,7 +19,7 @@ package org.apache.tez.dag.app.rm; import java.util.HashMap; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -91,6 +91,9 @@ public class TestLocalTaskSchedulerService { taskSchedulerService.initialize(); taskSchedulerService.start(); + // create a task that fills the task allocation queue + Task dummy_task = mock(Task.class); + taskSchedulerService.allocateTask(dummy_task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null); Task task = mock(Task.class); taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null); taskSchedulerService.deallocateTask(task, false, null, null); @@ -98,10 +101,10 @@ public class TestLocalTaskSchedulerService { taskSchedulerService.startRequestHandlerThread(); MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler(); - requestHandler.drainRequest(1); + requestHandler.drainRequest(3); assertEquals(1, requestHandler.deallocateCount); // The corresponding AllocateTaskRequest will be removed, so won't been processed. - assertEquals(0, requestHandler.allocateCount); + assertEquals(1, requestHandler.allocateCount); taskSchedulerService.shutdown(); } @@ -170,10 +173,10 @@ public class TestLocalTaskSchedulerService { public int allocateCount = 0; public int deallocateCount = 0; - public int processedCount =0; + public int dispatchCount = 0; MockAsyncDelegateRequestHandler( - BlockingQueue<TaskRequest> taskRequestQueue, + LinkedBlockingQueue<TaskRequest> taskRequestQueue, LocalContainerFactory localContainerFactory, HashMap<Object, Container> taskAllocations, TaskSchedulerContext appClientDelegate, Configuration conf) { @@ -182,13 +185,19 @@ public class TestLocalTaskSchedulerService { } @Override - void processRequest() { - super.processRequest(); - processedCount ++; + void dispatchRequest() { + super.dispatchRequest(); + dispatchCount++; + } + + @Override + void allocateTask() { + super.allocateTask(); + allocateCount++; } public void drainRequest(int count) { - while(processedCount != count || !taskRequestQueue.isEmpty()) { + while(dispatchCount != count || !clientRequestQueue.isEmpty()) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -198,15 +207,9 @@ public class TestLocalTaskSchedulerService { } @Override - void allocateTask(AllocateTaskRequest request) { - super.allocateTask(request); - allocateCount ++; - } - - @Override void deallocateTask(DeallocateTaskRequest request) { super.deallocateTask(request); - deallocateCount ++; + deallocateCount++; } } }
