Repository: tez Updated Branches: refs/heads/master 983ceeee1 -> 62d9853b3
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/62d9853b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/62d9853b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/62d9853b Branch: refs/heads/master Commit: 62d9853b35f41d73cca16d2004e321195da681f4 Parents: 983ceee Author: Bikas Saha <[email protected]> Authored: Mon Sep 21 11:19:12 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Mon Sep 21 11:19:12 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 10 +- .../apache/tez/dag/api/TezConfiguration.java | 12 + .../dag/app/rm/YarnTaskSchedulerService.java | 85 ++++++- .../tez/dag/app/rm/TestTaskScheduler.java | 229 ++++++++++++++++++- .../org/apache/tez/test/TestFaultTolerance.java | 20 ++ .../java/org/apache/tez/test/TestInput.java | 12 + 6 files changed, 357 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ee1e255..ad26eee 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2834. Make Tez preemption resilient to incorrect free resource reported + by YARN TEZ-2775. Improve and consolidate logging in Runtime components. TEZ-2097. TEZ-UI Add dag logs backend support TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions @@ -185,7 +187,9 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES -ALL CHANGES: +ALL CHANGES + TEZ-2834. Make Tez preemption resilient to incorrect free resource reported + by YARN TEZ-2775. Improve and consolidate logging in Runtime components. TEZ-2097. TEZ-UI Add dag logs backend support TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions @@ -445,6 +449,8 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2834. Make Tez preemption resilient to incorrect free resource reported + by YARN TEZ-2097. TEZ-UI Add dag logs backend support TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs @@ -668,6 +674,8 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2834. Make Tez preemption resilient to incorrect free resource reported + by YARN TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index f7fd8da..12435ca 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -890,6 +890,18 @@ public class TezConfiguration extends Configuration { public static final int TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT = 3; /** + * Int value. Time (in millisecs) that an unsatisfied request will wait before preempting other + * resources. In rare cases, the cluster says there are enough free resources but does not end + * up getting enough on a node to actually assign it to the job. This configuration tries to put + * a deadline on such wait to prevent indefinite job hangs. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS = + TEZ_AM_PREFIX + "preemption.max.wait-time-ms"; + public static final int TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT = 60*1000; // 60s + + /** * String value to a file path. * The location of the Tez libraries which will be localized for DAGs. * This follows the following semantics http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 5a5464f..1f05064 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -95,8 +95,9 @@ public class YarnTaskSchedulerService extends TaskScheduler private boolean reuseRackLocal; private boolean reuseNonLocal; + // type is linked hash map to maintain order of incoming requests Map<Object, CookieContainerRequest> taskRequests = - new HashMap<Object, CookieContainerRequest>(); + new LinkedHashMap<Object, CookieContainerRequest>(); // LinkedHashMap is need in getProgress() LinkedHashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>(); @@ -142,7 +143,11 @@ public class YarnTaskSchedulerService extends TaskScheduler long idleContainerTimeoutMin; long idleContainerTimeoutMax = 0; int sessionNumMinHeldContainers = 0; - int preemptionPercentage = 0; + int preemptionPercentage = 0; + long preemptionMaxWaitTime = 0; + + long highestWaitingRequestWaitStartTime = 0; + Priority highestWaitingRequestPriority = null; Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet(); @@ -328,6 +333,10 @@ public class YarnTaskSchedulerService extends TaskScheduler TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT); Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1, "Heartbeats between preemptions should be >=1"); + + preemptionMaxWaitTime = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, + TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT); + Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0"); delayedContainerManager = new DelayedContainerManager(); LOG.info("YarnTaskScheduler initialized with configuration: " + @@ -337,6 +346,7 @@ public class YarnTaskSchedulerService extends TaskScheduler ", reuseNonLocal: " + reuseNonLocal + ", localitySchedulingDelay: " + localitySchedulingDelay + ", preemptionPercentage: " + preemptionPercentage + + ", preemptionMaxWaitTime: " + preemptionMaxWaitTime + ", numHeartbeatsBetweenPreemptions: " + numHeartbeatsBetweenPreemptions + ", idleContainerMinTimeout: " + idleContainerTimeoutMin + ", idleContainerMaxTimeout: " + idleContainerTimeoutMax + @@ -1126,7 +1136,16 @@ public class YarnTaskSchedulerService extends TaskScheduler " Free: " + freeResource + " pendingRequests: " + taskRequests.size() + " delayedContainers: " + delayedContainerManager.delayedContainers.size() + - " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption; + " heartbeats: " + numHeartbeats + + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption + + ((highestWaitingRequestPriority != null) ? + (" highestWaitingRequestWaitStartTime: " + highestWaitingRequestWaitStartTime + + " highestWaitingRequestPriority: " + highestWaitingRequestPriority.toString()) : ""); + } + + private void resetHighestWaitingPriority(Priority newPri) { + highestWaitingRequestPriority = newPri; + highestWaitingRequestWaitStartTime = 0; } boolean preemptIfNeeded() { @@ -1164,10 +1183,26 @@ public class YarnTaskSchedulerService extends TaskScheduler if (highestPriRequest == null) { // nothing pending + resetHighestWaitingPriority(null); return true; } - if(fitsIn(highestPriRequest.getCapability(), freeResources)) { + // reset the wait time when waiting priority changes to prevent carry over of the value + if (highestWaitingRequestPriority == null || + !highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) { + resetHighestWaitingPriority(highestPriRequest.getPriority()); + } + + long currTime = System.currentTimeMillis(); + if (highestWaitingRequestWaitStartTime == 0) { + highestWaitingRequestWaitStartTime = currTime; + } + + boolean preemptionWaitDeadlineCrossed = + (currTime - highestWaitingRequestWaitStartTime) > preemptionMaxWaitTime ? true : false; + + if(!preemptionWaitDeadlineCrossed && + fitsIn(highestPriRequest.getCapability(), freeResources)) { if (LOG.isDebugEnabled()) { LOG.debug(highestPriRequest + " fits in free resources"); } else { @@ -1177,6 +1212,42 @@ public class YarnTaskSchedulerService extends TaskScheduler } return true; } + + if (preemptionWaitDeadlineCrossed) { + // check if anything lower priority is running - priority inversion + // this check could have been done earlier but in the common case + // this would be unnecessary since there are usually requests pending + // in the normal case without priority inversion. So do this expensive + // iteration now + boolean lowerPriRunning = false; + for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) { + HeldContainer heldContainer = heldContainers.get(entry.getValue().getId()); + CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo(); + Priority taskPriority = lastTaskInfo.getPriority(); + Object signature = lastTaskInfo.getCookie().getContainerSignature(); + if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) { + // lower priority task is running + if (containerSignatureMatcher.isExactMatch( + highestPriRequest.getCookie().getContainerSignature(), + signature)) { + // exact match with different priorities + continue; + } + lowerPriRunning = true; + break; + } + } + if (!lowerPriRunning) { + // nothing lower priority running + // normal case of many pending request without priority inversion + resetHighestWaitingPriority(null); + return true; + } + LOG.info("Preemption deadline crossed at pri: " + highestPriRequest.getPriority() + + " numRequests: " + numHighestPriRequests + ". " + + constructPreemptionPeriodicLog(freeResources)); + } + // highest priority request will not fit in existing free resources // free up some more // TODO this is subject to error wrt RM resource normalization @@ -1267,7 +1338,11 @@ public class YarnTaskSchedulerService extends TaskScheduler // this assert will be a no-op in production but can help identify // invalid assumptions during testing assert delayedContainerManager.delayedContainers.isEmpty(); - + if (!delayedContainerManager.delayedContainers.isEmpty()) { + LOG.warn("Expected delayed containers to be empty. " + + constructPreemptionPeriodicLog(freeResources)); + } + Priority preemptedTaskPriority = null; int numEntriesAtPreemptedPriority = 0; for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) { http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 1fc5092..c012c1e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -1543,11 +1543,13 @@ public class TestTaskScheduler { Object mockTask3Retry = mock(Object.class); Object mockTask3KillA = mock(Object.class); Object mockTask3KillB = mock(Object.class); + Object mockTaskPri8 = mock(Object.class); Object obj3 = new Object(); Priority pri2 = Priority.newInstance(2); Priority pri4 = Priority.newInstance(4); Priority pri5 = Priority.newInstance(5); Priority pri6 = Priority.newInstance(6); + Priority pri8 = Priority.newInstance(8); ArgumentCaptor<CookieContainerRequest> requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); @@ -1700,12 +1702,16 @@ public class TestTaskScheduler { Object mockTask3WaitCookie = new Object(); scheduler.allocateTask(mockTask3Wait, taskAsk, null, null, pri6, obj3, mockTask3WaitCookie); + // add a pri 8 request for the pri 8 container that will not be matched + Object mockTaskPri8Cookie = new Object(); + scheduler.allocateTask(mockTaskPri8, taskAsk, null, + null, pri8, obj3, mockTaskPri8Cookie); // no preemption - same pri scheduler.getProgress(); drainableAppCallback.drain(); + verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); - Priority pri8 = Priority.newInstance(8); Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer4.getNodeId().getHost()).thenReturn("host1"); when(mockContainer4.getResource()).thenReturn(taskAsk); @@ -1739,12 +1745,12 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4); - verify(mockRMClient, times(5)). - addContainerRequest(requestCaptor.capture()); + // internally re-request pri8 task request because we release pri8 new container + verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture()); CookieContainerRequest reAdded = requestCaptor.getValue(); - Assert.assertEquals(pri6, reAdded.getPriority()); + Assert.assertEquals(pri8, reAdded.getPriority()); Assert.assertEquals(taskAsk, reAdded.getCapability()); - Assert.assertEquals(mockTask3WaitCookie, reAdded.getCookie().getAppCookie()); + Assert.assertEquals(mockTaskPri8Cookie, reAdded.getCookie().getAppCookie()); // remove fudging. scheduler.delayedContainerManager.delayedContainers.clear(); @@ -1799,6 +1805,219 @@ public class TestTaskScheduler { scheduler.shutdown(); drainableAppCallback.drain(); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test (timeout=5000) + public void testTaskSchedulerPreemption2() throws Exception { + RackResolver.init(new YarnConfiguration()); + + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = + mock(TezAMRMClientAsync.class); + + String appHost = "host"; + int appPort = 0; + String appUrl = "url"; + + int waitTime = 1000; + + Configuration conf = new Configuration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); + conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 2); + conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, + null, null, new PreemptionMatcher(), conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + final TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); + + RegisterApplicationMasterResponse mockRegResponse = + mock(RegisterApplicationMasterResponse.class); + when( + mockRMClient.registerApplicationMaster(anyString(), anyInt(), + anyString())).thenReturn(mockRegResponse); + + scheduler.start(); + Resource totalResource = Resource.newInstance(4000, 4); // high value always reported + when(mockRMClient.getAvailableResources()).thenReturn(totalResource); + + // no preemption + scheduler.getProgress(); + drainableAppCallback.drain(); + Assert.assertEquals(totalResource, scheduler.getTotalResources()); + verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + + // allocate task + Object mockTask1 = mock(Object.class); + Object mockTask2 = mock(Object.class); + Object mockTask3 = mock(Object.class); + Object obj3 = new Object(); + Priority pri2 = Priority.newInstance(2); + Priority pri4 = Priority.newInstance(4); + Priority pri6 = Priority.newInstance(6); + + ArgumentCaptor<CookieContainerRequest> requestCaptor = + ArgumentCaptor.forClass(CookieContainerRequest.class); + final ArrayList<CookieContainerRequest> anyContainers = + new ArrayList<CookieContainerRequest>(); + + + Resource taskAsk = Resource.newInstance(1024, 1); + scheduler.allocateTask(mockTask1, taskAsk, null, + null, pri4, null, null); + drainableAppCallback.drain(); + verify(mockRMClient, times(1)). + addContainerRequest(requestCaptor.capture()); + anyContainers.add(requestCaptor.getValue()); + + scheduler.getProgress(); + drainableAppCallback.drain(); + Assert.assertEquals(totalResource, scheduler.getTotalResources()); + verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + + final List<ArrayList<CookieContainerRequest>> anyList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + final List<ArrayList<CookieContainerRequest>> emptyList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + + anyList.add(anyContainers); + List<Container> containers = new ArrayList<Container>(); + Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); + when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); + when(mockContainer1.getResource()).thenReturn(taskAsk); + when(mockContainer1.getPriority()).thenReturn(pri4); + ContainerId mockCId1 = mock(ContainerId.class); + when(mockContainer1.getId()).thenReturn(mockCId1); + containers.add(mockContainer1); + when( + mockRMClient.getMatchingRequests((Priority) any(), eq("host1"), + (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return emptyList; + } + }); + // RackResolver by default puts hosts in default-rack + when( + mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"), + (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return emptyList; + } + }); + when( + mockRMClient.getMatchingRequests((Priority) any(), + eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + int calls = 0; + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + if(calls > 0) { + anyContainers.remove(0); + } + calls++; + return anyList; + } + }); + + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + ContainerId cId = (ContainerId) args[0]; + scheduler.deallocateContainer(cId); + return null; + }}) + .when(mockApp).preemptContainer((ContainerId)any()); + + scheduler.onContainersAllocated(containers); + drainableAppCallback.drain(); + Assert.assertEquals(1, scheduler.taskAllocations.size()); + Assert.assertEquals(mockCId1, + scheduler.taskAllocations.get(mockTask1).getId()); + + // no preemption + scheduler.getProgress(); + drainableAppCallback.drain(); + verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + // no need for task preemption until now - so they should match + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + + // add a pending request that cannot be allocated until resources free up + Object mockTask2Cookie = new Object(); + scheduler.allocateTask(mockTask2, taskAsk, null, + null, pri2, obj3, mockTask2Cookie); + Object mockTask3Cookie = new Object(); + scheduler.allocateTask(mockTask3, taskAsk, null, + null, pri6, obj3, mockTask3Cookie); + // nothing waiting till now + Assert.assertNull(scheduler.highestWaitingRequestPriority); + Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime); + + long currTime = System.currentTimeMillis(); + scheduler.getProgress(); + drainableAppCallback.drain(); + verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + // enough free resources. preemption not triggered + Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority); + Assert.assertTrue(scheduler.highestWaitingRequestWaitStartTime >= currTime); + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + + Thread.sleep(waitTime + 10); + long oldStartWaitTime = scheduler.highestWaitingRequestWaitStartTime; + + scheduler.getProgress(); + drainableAppCallback.drain(); + verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + // enough free resources. deadline crossed. preemption triggered + Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority); + Assert.assertEquals(oldStartWaitTime, scheduler.highestWaitingRequestWaitStartTime); + Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption); + + scheduler.getProgress(); + drainableAppCallback.drain(); + verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); + verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId1); + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + // maintains existing waiting values + Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority); + Assert.assertEquals(oldStartWaitTime, scheduler.highestWaitingRequestWaitStartTime); + + // remove high pri request to test waiting pri change + scheduler.deallocateTask(mockTask2, false, null, null); + + scheduler.getProgress(); + // waiting value changes + Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority); + Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime); + + Thread.sleep(waitTime + 10); + scheduler.getProgress(); + drainableAppCallback.drain(); + // deadlines crossed but nothing lower pri running. so reset + Assert.assertNull(scheduler.highestWaitingRequestPriority); + Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime); + + scheduler.getProgress(); + drainableAppCallback.drain(); + // waiting value changes + Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority); + Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime); + + AppFinalStatus finalStatus = + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl); + when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); + scheduler.shutdown(); + drainableAppCallback.drain(); + } @SuppressWarnings("unchecked") @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index d03dd18..0d27032 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -88,6 +88,8 @@ public class TestFaultTolerance { tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4); + tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3); tezSession = TezClient.create("TestFaultTolerance", tezConf, true); tezSession.start(); @@ -271,6 +273,24 @@ public class TestFaultTolerance { } @Test (timeout=60000) + public void testBasicInputFailureWithoutExitDeadline() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 3); // 1 error < 0.4 fail fraction + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "2"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + + DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExitDeadline", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); + } + + + @Test (timeout=60000) public void testMultipleInputFailureWithoutExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setBoolean(TestInput.getVertexConfName( http://git-wip-us.apache.org/repos/asf/tez/blob/62d9853b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index 87ca93d..c2b60c7 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -75,6 +75,7 @@ public class TestInput extends AbstractLogicalInput { Set<Integer> failingInputIndices = Sets.newHashSet(); Integer failAll = new Integer(-1); int[] inputValues; + AtomicInteger numEventsReceived = new AtomicInteger(0); /** * Enable failure for this logical input @@ -192,12 +193,22 @@ public class TestInput extends AbstractLogicalInput { LOG.info("Failing input: " + msg); } } + int numEvents = numEventsReceived.get(); getContext().sendEvents(events); if (doFailAndExit) { String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier(); LOG.info(msg); throwException(msg); } else { + try { + while (numEvents == numEventsReceived.get()) { + // keep sending events + Thread.sleep(500); + getContext().sendEvents(events); + } + } catch (InterruptedException e) { + LOG.info("Interrupted while sending events", e); + } done = false; } } else if ((failingTaskIndices.contains(failAll) || @@ -330,6 +341,7 @@ public class TestInput extends AbstractLogicalInput { @Override public void handleEvents(List<Event> inputEvents) throws Exception { for (Event event : inputEvents) { + numEventsReceived.incrementAndGet(); if (event instanceof DataMovementEvent) { DataMovementEvent dmEvent = (DataMovementEvent) event; numCompletedInputs++;
