Repository: tez
Updated Branches:
refs/heads/branch-0.5 b88feb9d8 -> 0f6c1086f
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by
YARN (bikas)
(cherry picked from commit 62d9853b35f41d73cca16d2004e321195da681f4)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
tez-tests/src/test/java/org/apache/tez/test/TestInput.java
(cherry picked from commit 01ad2a7ce12e88bb536af54fd9b9d483943b8338)
(cherry picked from commit 970bda6566954bbfb519d62d19364b337982724b)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0f6c1086
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0f6c1086
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0f6c1086
Branch: refs/heads/branch-0.5
Commit: 0f6c1086f55e87fe13a05f6460b3dcd083d70d63
Parents: b88feb9
Author: Bikas Saha <[email protected]>
Authored: Mon Sep 21 11:19:12 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Mon Sep 21 12:20:46 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 10 +
.../dag/app/rm/YarnTaskSchedulerService.java | 85 ++++++-
.../tez/dag/app/rm/TestTaskScheduler.java | 231 ++++++++++++++++++-
.../org/apache/tez/test/TestFaultTolerance.java | 20 ++
.../java/org/apache/tez/test/TestInput.java | 12 +
6 files changed, 351 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 766f019..f9e4734 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
+ by YARN
TEZ-2812. Preemption sometimes does not respect heartbeats between
preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 014e5ff..df39822 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -672,6 +672,16 @@ public class TezConfiguration extends Configuration {
public static final int
TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT = 3;
/**
+ * Int value. Time (in millisecs) that an unsatisfied request will wait
before preempting other
+ * resources. In rare cases, the cluster says there are enough free
resources but does not end
+ * up getting enough on a node to actually assign it to the job. This
configuration tries to put
+ * a deadline on such wait to prevent indefinite job hangs.
+ */
+ public static final String TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS =
+ TEZ_AM_PREFIX + "preemption.max.wait-time-ms";
+ public static final int TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT =
60*1000; // 60s
+
+ /**
* String value to a file path.
* The location of the Tez libraries which will be localized for DAGs.
* This follows the following semantics
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 1ed3d02..6a024c3 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -98,8 +98,9 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
private boolean reuseRackLocal;
private boolean reuseNonLocal;
+ // type is linked hash map to maintain order of incoming requests
Map<Object, CookieContainerRequest> taskRequests =
- new HashMap<Object, CookieContainerRequest>();
+ new LinkedHashMap<Object, CookieContainerRequest>();
// LinkedHashMap is need in getProgress()
LinkedHashMap<Object, Container> taskAllocations =
new LinkedHashMap<Object, Container>();
@@ -146,7 +147,11 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
long idleContainerTimeoutMin;
long idleContainerTimeoutMax = 0;
int sessionNumMinHeldContainers = 0;
- int preemptionPercentage = 0;
+ int preemptionPercentage = 0;
+ long preemptionMaxWaitTime = 0;
+
+ long highestWaitingRequestWaitStartTime = 0;
+ Priority highestWaitingRequestPriority = null;
Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
@@ -346,6 +351,10 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT);
Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1,
"Heartbeats between preemptions should be >=1");
+
+ preemptionMaxWaitTime =
conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS,
+ TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT);
+ Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max
wait time must be >=0");
delayedContainerManager = new DelayedContainerManager();
LOG.info("TaskScheduler initialized with configuration: " +
@@ -355,6 +364,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
", reuseNonLocal: " + reuseNonLocal +
", localitySchedulingDelay: " + localitySchedulingDelay +
", preemptionPercentage: " + preemptionPercentage +
+ ", preemptionMaxWaitTime: " + preemptionMaxWaitTime +
", numHeartbeatsBetweenPreemptions: " +
numHeartbeatsBetweenPreemptions +
", idleContainerMinTimeout: " + idleContainerTimeoutMin +
", idleContainerMaxTimeout: " + idleContainerTimeoutMax +
@@ -1113,6 +1123,11 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
return (int) Math.ceil((original * percent)/100.f);
}
+ private void resetHighestWaitingPriority(Priority newPri) {
+ highestWaitingRequestPriority = newPri;
+ highestWaitingRequestWaitStartTime = 0;
+ }
+
boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
@@ -1147,16 +1162,71 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
if (highestPriRequest == null) {
// nothing pending
+ resetHighestWaitingPriority(null);
return true;
}
- if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
+ // reset the wait time when waiting priority changes to prevent carry
over of the value
+ if (highestWaitingRequestPriority == null ||
+
!highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) {
+ resetHighestWaitingPriority(highestPriRequest.getPriority());
+ }
+
+ long currTime = System.currentTimeMillis();
+ if (highestWaitingRequestWaitStartTime == 0) {
+ highestWaitingRequestWaitStartTime = currTime;
+ }
+
+ boolean preemptionWaitDeadlineCrossed =
+ (currTime - highestWaitingRequestWaitStartTime) >
preemptionMaxWaitTime ? true : false;
+
+ if(!preemptionWaitDeadlineCrossed &&
+ fitsIn(highestPriRequest.getCapability(), freeResources)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Highest pri request: " + highestPriRequest + " fits in
available resources "
+ freeResources);
}
return true;
}
+
+ if (preemptionWaitDeadlineCrossed) {
+ // check if anything lower priority is running - priority inversion
+ // this check could have been done earlier but in the common case
+ // this would be unnecessary since there are usually requests pending
+ // in the normal case without priority inversion. So do this expensive
+ // iteration now
+ boolean lowerPriRunning = false;
+ for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+ HeldContainer heldContainer =
heldContainers.get(entry.getValue().getId());
+ CookieContainerRequest lastTaskInfo =
heldContainer.getLastTaskInfo();
+ Priority taskPriority = lastTaskInfo.getPriority();
+ Object signature = lastTaskInfo.getCookie().getContainerSignature();
+ if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+ // lower priority task is running
+ if (containerSignatureMatcher.isExactMatch(
+ highestPriRequest.getCookie().getContainerSignature(),
+ signature)) {
+ // exact match with different priorities
+ continue;
+ }
+ lowerPriRunning = true;
+ break;
+ }
+ }
+ if (!lowerPriRunning) {
+ // nothing lower priority running
+ // normal case of many pending request without priority inversion
+ resetHighestWaitingPriority(null);
+ return true;
+ }
+ LOG.info("Preemption deadline crossed at pri: " +
highestPriRequest.getPriority()
+ + " numRequests: " + numHighestPriRequests +
+ " Allocated resource memory: " + allocatedResources.getMemory() +
+ " cpu:" + allocatedResources.getVirtualCores() +
+ " delayedContainers: " +
delayedContainerManager.delayedContainers.size() +
+ " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " +
heartbeatAtLastPreemption);
+ }
+
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
@@ -1247,7 +1317,14 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
-
+ if (!delayedContainerManager.delayedContainers.isEmpty()) {
+ LOG.info("Expected delayed containers to be empty." +
+ " Allocated resource memory: " + allocatedResources.getMemory() +
+ " cpu:" + allocatedResources.getVirtualCores() +
+ " delayedContainers: " +
delayedContainerManager.delayedContainers.size() +
+ " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " +
heartbeatAtLastPreemption);
+ }
+
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 4b1fb18..d6ae2eb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1524,11 +1524,13 @@ public class TestTaskScheduler {
Object mockTask3Retry = mock(Object.class);
Object mockTask3KillA = mock(Object.class);
Object mockTask3KillB = mock(Object.class);
+ Object mockTaskPri8 = mock(Object.class);
Object obj3 = new Object();
Priority pri2 = Priority.newInstance(2);
Priority pri4 = Priority.newInstance(4);
Priority pri5 = Priority.newInstance(5);
Priority pri6 = Priority.newInstance(6);
+ Priority pri8 = Priority.newInstance(8);
ArgumentCaptor<CookieContainerRequest> requestCaptor =
ArgumentCaptor.forClass(CookieContainerRequest.class);
@@ -1681,12 +1683,16 @@ public class TestTaskScheduler {
Object mockTask3WaitCookie = new Object();
scheduler.allocateTask(mockTask3Wait, taskAsk, null,
null, pri6, obj3, mockTask3WaitCookie);
+ // add a pri 8 request for the pri 8 container that will not be matched
+ Object mockTaskPri8Cookie = new Object();
+ scheduler.allocateTask(mockTaskPri8, taskAsk, null,
+ null, pri8, obj3, mockTaskPri8Cookie);
// no preemption - same pri
scheduler.getProgress();
drainableAppCallback.drain();
+ verify(mockRMClient,
times(6)).addContainerRequest(requestCaptor.capture());
verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
- Priority pri8 = Priority.newInstance(8);
Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer4.getNodeId().getHost()).thenReturn("host1");
when(mockContainer4.getResource()).thenReturn(taskAsk);
@@ -1720,12 +1726,12 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4);
- verify(mockRMClient, times(5)).
- addContainerRequest(requestCaptor.capture());
+ // internally re-request pri8 task request because we release pri8 new
container
+ verify(mockRMClient,
times(7)).addContainerRequest(requestCaptor.capture());
CookieContainerRequest reAdded = requestCaptor.getValue();
- Assert.assertEquals(pri6, reAdded.getPriority());
+ Assert.assertEquals(pri8, reAdded.getPriority());
Assert.assertEquals(taskAsk, reAdded.getCapability());
- Assert.assertEquals(mockTask3WaitCookie,
reAdded.getCookie().getAppCookie());
+ Assert.assertEquals(mockTaskPri8Cookie,
reAdded.getCookie().getAppCookie());
// remove fudging.
scheduler.delayedContainerManager.delayedContainers.clear();
@@ -1781,6 +1787,221 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
scheduler.close();
}
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test (timeout=5000)
+ public void testTaskSchedulerPreemption2() throws Exception {
+ RackResolver.init(new YarnConfiguration());
+ TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+ AppContext mockAppContext = mock(AppContext.class);
+ when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+
mock(TezAMRMClientAsync.class);
+
+ String appHost = "host";
+ int appPort = 0;
+ String appUrl = "url";
+ final TaskSchedulerWithDrainableAppCallback scheduler =
+ new TaskSchedulerWithDrainableAppCallback(
+ mockApp, new PreemptionMatcher(), appHost, appPort,
+ appUrl, mockRMClient, mockAppContext);
+ TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+ .getDrainableAppCallback();
+
+ int waitTime = 1000;
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
2);
+ conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime);
+ scheduler.init(conf);
+
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
3);
+
+ RegisterApplicationMasterResponse mockRegResponse =
+ mock(RegisterApplicationMasterResponse.class);
+ when(
+ mockRMClient.registerApplicationMaster(anyString(), anyInt(),
+ anyString())).thenReturn(mockRegResponse);
+
+ scheduler.start();
+ Resource totalResource = Resource.newInstance(4000, 4);
+ when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
+
+ // no preemption
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ Assert.assertEquals(totalResource, scheduler.getTotalResources());
+ verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+
+ // allocate task
+ Object mockTask1 = mock(Object.class);
+ Object mockTask2 = mock(Object.class);
+ Object mockTask3 = mock(Object.class);
+ Object obj3 = new Object();
+ Priority pri2 = Priority.newInstance(2);
+ Priority pri4 = Priority.newInstance(4);
+ Priority pri6 = Priority.newInstance(6);
+
+ ArgumentCaptor<CookieContainerRequest> requestCaptor =
+ ArgumentCaptor.forClass(CookieContainerRequest.class);
+ final ArrayList<CookieContainerRequest> anyContainers =
+ new ArrayList<CookieContainerRequest>();
+
+
+ Resource taskAsk = Resource.newInstance(1024, 1);
+ scheduler.allocateTask(mockTask1, taskAsk, null,
+ null, pri4, null, null);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).
+ addContainerRequest(requestCaptor.capture());
+ anyContainers.add(requestCaptor.getValue());
+
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ Assert.assertEquals(totalResource, scheduler.getTotalResources());
+ verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+
+ final List<ArrayList<CookieContainerRequest>> anyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ final List<ArrayList<CookieContainerRequest>> emptyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+
+ anyList.add(anyContainers);
+ List<Container> containers = new ArrayList<Container>();
+ Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+ when(mockContainer1.getResource()).thenReturn(taskAsk);
+ when(mockContainer1.getPriority()).thenReturn(pri4);
+ ContainerId mockCId1 = mock(ContainerId.class);
+ when(mockContainer1.getId()).thenReturn(mockCId1);
+ containers.add(mockContainer1);
+ when(
+ mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+ });
+ // RackResolver by default puts hosts in default-rack
+ when(
+ mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+ });
+ when(
+ mockRMClient.getMatchingRequests((Priority) any(),
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ int calls = 0;
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ if(calls > 0) {
+ anyContainers.remove(0);
+ }
+ calls++;
+ return anyList;
+ }
+ });
+
+ Mockito.doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ ContainerId cId = (ContainerId) args[0];
+ scheduler.deallocateContainer(cId);
+ return null;
+ }})
+ .when(mockApp).preemptContainer((ContainerId)any());
+
+ scheduler.onContainersAllocated(containers);
+ drainableAppCallback.drain();
+ Assert.assertEquals(1, scheduler.taskAllocations.size());
+ Assert.assertEquals(mockCId1,
+ scheduler.taskAllocations.get(mockTask1).getId());
+
+ // no preemption
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
+ // add a pending request that cannot be allocated until resources free up
+ Object mockTask2Cookie = new Object();
+ scheduler.allocateTask(mockTask2, taskAsk, null,
+ null, pri2, obj3, mockTask2Cookie);
+ Object mockTask3Cookie = new Object();
+ scheduler.allocateTask(mockTask3, taskAsk, null,
+ null, pri6, obj3, mockTask3Cookie);
+ // nothing waiting till now
+ Assert.assertNull(scheduler.highestWaitingRequestPriority);
+ Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime);
+
+ long currTime = System.currentTimeMillis();
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // enough free resources. preemption not triggered
+ Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+ Assert.assertTrue(scheduler.highestWaitingRequestWaitStartTime >=
currTime);
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
+ Thread.sleep(waitTime + 10);
+ long oldStartWaitTime = scheduler.highestWaitingRequestWaitStartTime;
+
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // enough free resources. deadline crossed. preemption triggered
+ Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+ Assert.assertEquals(oldStartWaitTime,
scheduler.highestWaitingRequestWaitStartTime);
+ Assert.assertTrue(scheduler.numHeartbeats >
scheduler.heartbeatAtLastPreemption);
+
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
+ verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId1);
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+ // maintains existing waiting values
+ Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+ Assert.assertEquals(oldStartWaitTime,
scheduler.highestWaitingRequestWaitStartTime);
+
+ // remove high pri request to test waiting pri change
+ scheduler.deallocateTask(mockTask2, false);
+
+ scheduler.getProgress();
+ // waiting value changes
+ Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority);
+ Assert.assertTrue(oldStartWaitTime <
scheduler.highestWaitingRequestWaitStartTime);
+
+ Thread.sleep(waitTime + 10);
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ // deadlines crossed but nothing lower pri running. so reset
+ Assert.assertNull(scheduler.highestWaitingRequestPriority);
+ Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime);
+
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ // waiting value changes
+ Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority);
+ Assert.assertTrue(oldStartWaitTime <
scheduler.highestWaitingRequestWaitStartTime);
+
+ AppFinalStatus finalStatus =
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+ when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+ scheduler.close();
+ drainableAppCallback.drain();
+ }
@SuppressWarnings("unchecked")
@Test
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git
a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 1c1e846..11ce4bc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -88,6 +88,8 @@ public class TestFaultTolerance {
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED,
false);
+
tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION,
0.4);
+
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
3);
tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
tezSession.start();
@@ -271,6 +273,24 @@ public class TestFaultTolerance {
}
@Test (timeout=60000)
+ public void testBasicInputFailureWithoutExitDeadline() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 3); // 1 error <
0.4 fail fraction
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "2");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+
+ DAG dag =
SimpleTestDAG.createDAG("testBasicInputFailureWithoutExitDeadline", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+
+ @Test (timeout=60000)
public void testMultipleInputFailureWithoutExit() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
http://git-wip-us.apache.org/repos/asf/tez/blob/0f6c1086/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 465dd9c..8498acb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -71,6 +71,7 @@ public class TestInput extends AbstractLogicalInput {
Set<Integer> failingInputIndices = Sets.newHashSet();
Integer failAll = new Integer(-1);
int[] inputValues;
+ AtomicInteger numEventsReceived = new AtomicInteger(0);
/**
* Enable failure for this logical input
@@ -176,12 +177,22 @@ public class TestInput extends AbstractLogicalInput {
LOG.info("Failing input: " + msg);
}
}
+ int numEvents = numEventsReceived.get();
getContext().sendEvents(events);
if (doFailAndExit) {
String msg = "FailingInput exiting: " +
getContext().getUniqueIdentifier();
LOG.info(msg);
throwException(msg);
} else {
+ try {
+ while (numEvents == numEventsReceived.get()) {
+ // keep sending events
+ Thread.sleep(500);
+ getContext().sendEvents(events);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while sending events", e);
+ }
done = false;
}
} else if ((failingTaskIndices.contains(failAll) ||
@@ -281,6 +292,7 @@ public class TestInput extends AbstractLogicalInput {
@Override
public void handleEvents(List<Event> inputEvents) throws Exception {
for (Event event : inputEvents) {
+ numEventsReceived.incrementAndGet();
if (event instanceof DataMovementEvent) {
DataMovementEvent dmEvent = (DataMovementEvent) event;
numCompletedInputs++;