Repository: tez
Updated Branches:
refs/heads/branch-0.6 a1d7cda8e -> f8cabdc4c
TEZ-2752. logUnsuccessful completion in Attempt should write original finish
time to ATS (bikas)
(cherry picked from commit 996906d27cdb0a1c4301dc449aa5dc638b5b4363)
Conflicts:
CHANGES.txt
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
(cherry picked from commit ccfc16ba88c592e0a581fa8f988c214f84b68312)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f8cabdc4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f8cabdc4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f8cabdc4
Branch: refs/heads/branch-0.6
Commit: f8cabdc4cf5a9e8300f99fb7b9d7c513d3ec1c36
Parents: a1d7cda
Author: Bikas Saha <[email protected]>
Authored: Wed Sep 2 16:34:05 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Wed Sep 2 18:43:57 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 15 ++++--
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 ++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 56 ++++++++++++++------
3 files changed, 56 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f8cabdc4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2920998..1a0e221 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,17 +6,19 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
same name
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort
buffers
- TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if
configured higher
- TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
+ TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if
configured higher.
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2687. ATS History shutdown happens before the min-held containers are
released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the
default max counters
TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch
option
TEZ-2630. TezChild receives IP address instead of FQDN.
-Release 0.6.2: Unreleased
+Release 0.6.2: 2015-08-07
INCOMPATIBLE CHANGES
@@ -230,13 +232,16 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
same name
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort
buffers
- TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if
configured higher
- TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
+ TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if
configured higher.
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2687. ATS History shutdown happens before the min-held containers are
released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the
default max counters
+ TEZ-2630. TezChild receives IP address instead of FQDN.
TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0
physical inputs.
TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
http://git-wip-us.apache.org/repos/asf/tez/blob/f8cabdc4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index fe74586..6d9d4ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -982,9 +982,13 @@ public class TaskAttemptImpl implements TaskAttempt,
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
+ long finishTime = getFinishTime();
+ if (finishTime <= 0) {
+ finishTime = clock.getTime(); // comes here in case it was terminated
before launch
+ }
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
- clock.getTime(), state,
+ finishTime, state,
terminationCause,
StringUtils.join(
getDiagnostics(), LINE_SEPARATOR), getCounters());
http://git-wip-us.apache.org/repos/asf/tez/blob/f8cabdc4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 126e002..d12412f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -66,6 +66,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -88,6 +89,9 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -99,6 +103,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -112,11 +117,20 @@ public class TestTaskAttempt {
return new FileStatus(1, false, 1, 1, 1, f);
}
}
+
+ AppContext appCtx;
@BeforeClass
public static void setup() {
MockDNSToSwitchMapping.initializeMockRackResolver();
}
+
+ @Before
+ public void setupTest() {
+ appCtx = mock(AppContext.class);
+ HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+ doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+ }
@Test(timeout = 5000)
public void testLocalityRequest() {
@@ -134,7 +148,7 @@ public class TestTaskAttempt {
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new
SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
locationHint, false, Resource.newInstance(1024, 1),
createFakeContainerContext(), false);
TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
@@ -167,12 +181,12 @@ public class TestTaskAttempt {
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new
SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
null, false, Resource.newInstance(1024, 1),
createFakeContainerContext(), false);
TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1,
eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new
SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
null, true, Resource.newInstance(1024, 1),
createFakeContainerContext(), false);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -231,7 +245,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
- mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
+ appCtx, locationHint, false, Resource.newInstance(1024,
1), createFakeContainerContext(), false);
TaskAttemptImpl spyTa = spy(taImpl);
@@ -283,7 +297,7 @@ public class TestTaskAttempt {
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
- AppContext mockAppContext = mock(AppContext.class);
+ AppContext mockAppContext = appCtx;
doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -340,7 +354,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -441,7 +454,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -506,7 +518,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -599,7 +610,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -701,7 +711,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -792,7 +801,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -887,7 +895,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -990,7 +997,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -1090,7 +1096,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -1098,6 +1103,8 @@ public class TestTaskAttempt {
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
+ HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+ doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
TaskHeartbeatHandler mockHeartbeatHandler =
mock(TaskHeartbeatHandler.class);
MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1,
eventHandler,
@@ -1119,7 +1126,12 @@ public class TestTaskAttempt {
int expectedEventsTillSucceeded = 6;
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ ArgumentCaptor<DAGHistoryEvent> histArg =
ArgumentCaptor.forClass(DAGHistoryEvent.class);
verify(eventHandler,
times(expectedEventsTillSucceeded)).handle(arg.capture());
+ verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and
finish
+ DAGHistoryEvent histEvent = histArg.getValue();
+ TaskAttemptFinishedEvent finishEvent =
(TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+ long finishTime = finishEvent.getFinishTime();
verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1);
@@ -1150,6 +1162,11 @@ public class TestTaskAttempt {
assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST,
taImpl.getTerminationCause());
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
+ verify(mockHistHandler, times(3)).handle(histArg.capture());
+ histEvent = histArg.getValue();
+ finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+ long newFinishTime = finishEvent.getFinishTime();
+ Assert.assertEquals(finishTime, newFinishTime);
assertEquals(true, taImpl.inputFailedReported);
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1235,7 +1252,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
@@ -1305,9 +1321,11 @@ public class TestTaskAttempt {
clock, taskHeartbeatHandler, appContext,
isRescheduled, resource, containerContext, leafVertex);
this.locationHint = locationHint;
+ doReturn(mockVertex).when(mockTask).getVertex();
}
Vertex mockVertex = mock(Vertex.class);
+ Task mockTask = mock(Task.class);
boolean inputFailedReported = false;
@Override
@@ -1316,6 +1334,11 @@ public class TestTaskAttempt {
}
@Override
+ public Task getTask() {
+ return mockTask;
+ }
+
+ @Override
protected Vertex getVertex() {
return mockVertex;
}
@@ -1329,18 +1352,21 @@ public class TestTaskAttempt {
@Override
protected void logJobHistoryAttemptStarted() {
taskAttemptStartedEventLogged++;
+ super.logJobHistoryAttemptStarted();
}
@Override
protected void logJobHistoryAttemptFinishedEvent(
TaskAttemptStateInternal state) {
taskAttemptFinishedEventLogged++;
+ super.logJobHistoryAttemptFinishedEvent(state);
}
@Override
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
taskAttemptFinishedEventLogged++;
+ super.logJobHistoryAttemptUnsuccesfulCompletion(state);
}
@Override