Repository: tez Updated Branches: refs/heads/master c6c9f6ecd -> f9d15c869
TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f9d15c86 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f9d15c86 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f9d15c86 Branch: refs/heads/master Commit: f9d15c8695de7975817631b051450336bc5eadee Parents: c6c9f6e Author: Jeff Zhang <[email protected]> Authored: Fri Oct 9 15:07:17 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Oct 9 15:07:17 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 4 ++++ .../apache/tez/dag/app/TaskCommunicatorManager.java | 1 + .../org/apache/tez/runtime/task/TaskReporter.java | 14 ++++++++++---- .../org/apache/tez/test/TestExceptionPropagation.java | 12 +++++++++++- 4 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8e5da31..25bb1d2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-1788. Allow vertex level disabling of speculation TEZ-2868. Fix setting Caller Context in Tez Examples. TEZ-2860. NPE in DAGClientImpl. @@ -200,6 +201,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-2868. Fix setting Caller Context in Tez Examples. TEZ-2860. NPE in DAGClientImpl. TEZ-2855. Fix a potential NPE while routing VertexManager events. @@ -475,6 +477,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2716. DefaultSorter.isRleNeeded not thread safe TEZ-2758. Remove append API in RecoveryService after TEZ-1909. @@ -706,6 +709,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2834. Make Tez preemption resilient to incorrect free resource reported http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 0bc02dc..8c17c2c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -241,6 +241,7 @@ public class TaskCommunicatorManager extends AbstractService implements tezEvent.setEventReceivedTime(currTime); final EventType eventType = tezEvent.getEventType(); if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { + // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, (TaskStatusUpdateEvent) tezEvent.getEvent()); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 6705020..263300e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -374,15 +374,21 @@ public class TaskReporter implements TaskReporterInterface { EventMetaData srcMeta) throws IOException, TezException { // Ensure only one final event is ever sent. if (!finalEventQueued.getAndSet(true)) { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + List<TezEvent> tezEvents = new ArrayList<TezEvent>(); if (diagnostics == null) { diagnostics = ExceptionUtils.getStackTrace(t); } else { diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); } - TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), - srcMeta == null ? updateEventMetadata : srcMeta); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; + tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta)); + try { + tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata)); + } catch (Exception e) { + // Counter may exceed limitation + LOG.warn("Error when get constructing TaskStatusUpdateEvent"); + } + return !heartbeat(tezEvents).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); return askedToDie.get(); http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index b8b46cb..7d88fdf 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -225,7 +225,11 @@ public class TestExceptionPropagation { DAGStatus dagStatus = dagClient.waitForCompletion(); String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ","); LOG.info("Diagnostics:" + diagnostics); - assertTrue(diagnostics.contains(exLocation.name())); + if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) { + assertTrue(diagnostics.contains("Too many counters")); + } else { + assertTrue(diagnostics.contains(exLocation.name())); + } } } finally { stopSessionClient(); @@ -302,6 +306,7 @@ public class TestExceptionPropagation { // PROCESSOR_HANDLE_EVENTS PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR, PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION, + PROCESSOR_COUNTER_EXCEEDED, // VM VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED, @@ -626,6 +631,11 @@ public class TestExceptionPropagation { throw new Error(this.exLocation.name()); } else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) { throw new Exception(this.exLocation.name()); + } else if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) { + // simulate the counter limitation exceeded + for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) { + getContext().getCounters().findCounter("mycounter", "counter_"+i).increment(1); + } } }
