Repository: tez
Updated Branches:
refs/heads/branch-0.7 acbd25002 -> 8d49fd528
TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat
fails (zjffdu)
(cherry picked from commit f9d15c8695de7975817631b051450336bc5eadee)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8d49fd52
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8d49fd52
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8d49fd52
Branch: refs/heads/branch-0.7
Commit: 8d49fd5285016fb64ebccdc9cf31c408c79ebaaf
Parents: acbd250
Author: Jeff Zhang <[email protected]>
Authored: Fri Oct 9 15:07:17 2015 +0800
Committer: Jeff Zhang <[email protected]>
Committed: Fri Oct 9 15:15:55 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../tez/dag/app/TaskAttemptListenerImpTezDag.java | 1 +
.../org/apache/tez/runtime/task/TaskReporter.java | 14 ++++++++++----
.../org/apache/tez/test/TestExceptionPropagation.java | 12 +++++++++++-
4 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8d49fd52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c732230..d67f70e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed
heartbeat fails
TEZ-2868. Fix setting Caller Context in Tez Examples.
TEZ-2860. NPE in DAGClientImpl.
TEZ-2855. Fix a potential NPE while routing VertexManager events.
@@ -286,6 +287,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed
heartbeat fails
TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2716. DefaultSorter.isRleNeeded not thread safe
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
@@ -518,6 +520,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed
heartbeat fails
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
by YARN
http://git-wip-us.apache.org/repos/asf/tez/blob/8d49fd52/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 5ef89f6..3ad16cd 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -441,6 +441,7 @@ public class TaskAttemptListenerImpTezDag extends
AbstractService implements
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
+ // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED
otherwise Status may be missed
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
(TaskStatusUpdateEvent) tezEvent.getEvent());
http://git-wip-us.apache.org/repos/asf/tez/blob/8d49fd52/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index bf93ce3..c1f4fe9 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -370,15 +370,21 @@ public class TaskReporter {
EventMetaData srcMeta) throws IOException, TezException {
// Ensure only one final event is ever sent.
if (!finalEventQueued.getAndSet(true)) {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true),
updateEventMetadata);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>();
if (diagnostics == null) {
diagnostics = ExceptionUtils.getStackTrace(t);
} else {
diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
}
- TezEvent taskAttemptFailedEvent = new TezEvent(new
TaskAttemptFailedEvent(diagnostics),
- srcMeta == null ? updateEventMetadata : srcMeta);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent,
taskAttemptFailedEvent)).shouldDie;
+ tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ srcMeta == null ? updateEventMetadata : srcMeta));
+ try {
+ tezEvents.add(new TezEvent(getStatusUpdateEvent(true),
updateEventMetadata));
+ } catch (Exception e) {
+ // Counter may exceed limitation
+ LOG.warn("Error when get constructing TaskStatusUpdateEvent");
+ }
+ return !heartbeat(tezEvents).shouldDie;
} else {
LOG.warn("A final task state event has already been sent. Not sending
again");
return askedToDie.get();
http://git-wip-us.apache.org/repos/asf/tez/blob/8d49fd52/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git
a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 49bb9f5..caf0822 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -224,7 +224,11 @@ public class TestExceptionPropagation {
DAGStatus dagStatus = dagClient.waitForCompletion();
String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
LOG.info("Diagnostics:" + diagnostics);
- assertTrue(diagnostics.contains(exLocation.name()));
+ if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+ assertTrue(diagnostics.contains("Too many counters"));
+ } else {
+ assertTrue(diagnostics.contains(exLocation.name()));
+ }
}
} finally {
stopSessionClient();
@@ -301,6 +305,7 @@ public class TestExceptionPropagation {
// PROCESSOR_HANDLE_EVENTS
PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION,
PROCESSOR_INITIALIZE_EXCEPTION,
+ PROCESSOR_COUNTER_EXCEEDED,
// VM
VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED,
VM_ON_VERTEX_STARTED,
@@ -625,6 +630,11 @@ public class TestExceptionPropagation {
throw new Error(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION)
{
throw new Exception(this.exLocation.name());
+ } else if (this.exLocation ==
ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+ // simulate the counter limitation exceeded
+ for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) {
+ getContext().getCounters().findCounter("mycounter",
"counter_"+i).increment(1);
+ }
}
}