Repository: tez
Updated Branches:
refs/heads/branch-0.7 f14d5127f -> 94e256de6
TEZ-814. Improve heuristic for determining a task has failed outputs (bikas)
(cherry picked from commit 94488e79d8d3d73e93337a57a48d82f82182591b)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/94e256de
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/94e256de
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/94e256de
Branch: refs/heads/branch-0.7
Commit: 94e256de6dc760c2ae5f17ec4b5a951ca1651118
Parents: f14d512
Author: Bikas Saha <[email protected]>
Authored: Thu Sep 17 23:09:11 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Thu Sep 17 23:14:12 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../apache/tez/dag/api/TezConfiguration.java | 15 ++++++
tez-dag/findbugs-exclude.xml | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 33 +++++++++----
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 49 ++++++++++++++++++--
5 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 88b40c1..ec01f75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
TEZ-2830. Backport TEZ-2774 to branch-0.7. Improvements to logging in the AM
and part of the runtime.
TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
@@ -267,6 +268,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
down an AM.
@@ -487,6 +489,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
down an AM.
TEZ-2745. ClassNotFoundException of user code should fail dag
http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cd9e59c..d0a76d0 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -232,6 +232,21 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
/**
+ * int value. Represents the maximum time in seconds for which a consumer
attempt can report
+ * a read error against its producer attempt, after which the producer
attempt will be re-run
+ * to re-generate the output. There are other heuristics which determine the
retry and mainly
+ * try to guard against a flurry of re-runs due to intermittent read errors
+ * (due to network issues). This configuration puts a time limit on those
heuristics to ensure
+ * jobs dont hang indefinitely due to lack of closure in those heuristics
+ *
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC =
+ TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
+ public static final int
TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;
+
+ /**
* Boolean value. Determines when the final outputs to data sinks are
committed. Commit is an
* output specific operation and typically involves making the output
visible for consumption.
* If the config is true, then the outputs are committed at the end of DAG
completion after all
http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index b2990ed..4c01edc 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -228,6 +228,7 @@
<Or>
<Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
<Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
+ <Field name="MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC"/>
</Or>
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
</Match>
http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index f63f461..5357063 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -110,6 +110,7 @@ import
org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
public class TaskAttemptImpl implements TaskAttempt,
EventHandler<TaskAttemptEvent> {
@@ -195,10 +196,10 @@ public class TaskAttemptImpl implements TaskAttempt,
Set<String> taskHosts = new HashSet<String>();
Set<String> taskRacks = new HashSet<String>();
- private Set<TezTaskAttemptID> uniquefailedOutputReports =
- new HashSet<TezTaskAttemptID>();
+ private Map<TezTaskAttemptID, Long> uniquefailedOutputReports =
Maps.newHashMap();
private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
private static int MAX_ALLOWED_OUTPUT_FAILURES;
+ private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
protected final boolean isRescheduled;
private final Resource taskResource;
@@ -470,6 +471,10 @@ public class TaskAttemptImpl implements TaskAttempt,
MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
+
+ MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt(
+ TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+
TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -1569,7 +1574,17 @@ public class TaskAttemptImpl implements TaskAttempt,
LOG.info(attempt.getID()
+ " blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa);
- attempt.uniquefailedOutputReports.add(failedDestTaId);
+ long time = attempt.clock.getTime();
+ Long firstErrReportTime =
attempt.uniquefailedOutputReports.get(failedDestTaId);
+ if (firstErrReportTime == null) {
+ attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+ firstErrReportTime = time;
+ }
+
+ int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
+ boolean crossTimeDeadline = readErrorTimespanSec >=
+ MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+
float failureFraction = ((float)
attempt.uniquefailedOutputReports.size())
/ outputFailedEvent.getConsumerTaskNumber();
@@ -1581,14 +1596,16 @@ public class TaskAttemptImpl implements TaskAttempt,
// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
- if (withinFailureFractionLimits && withinOutputFailureLimits) {
+ if (!crossTimeDeadline && withinFailureFractionLimits &&
withinOutputFailureLimits) {
return attempt.getInternalState();
}
String message = attempt.getID() + " being failed for too many output
errors. "
- + "failureFraction=" + failureFraction + ", "
- + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" +
MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + ", "
- + "uniquefailedOutputReports=" +
attempt.uniquefailedOutputReports.size() + ", "
- + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES;
+ + "failureFraction=" + failureFraction
+ + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" +
MAX_ALLOWED_OUTPUT_FAILURES_FRACTION
+ + ", uniquefailedOutputReports=" +
attempt.uniquefailedOutputReports.size()
+ + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES
+ + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" +
MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC
+ + ", readErrorTimespan=" + readErrorTimespanSec;
LOG.info(message);
attempt.addDiagnosticInfo(message);
// send input failed event
http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 1a1cb11..d526a1d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1188,7 +1188,7 @@ public class TestTaskAttempt {
assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
taImpl.getTerminationCause());
}
- @Test(timeout = 5000)
+ @Test//(timeout = 5000)
// Verifies that multiple TooManyFetchFailures are handled correctly by the
// TaskAttempt.
public void testMultipleOutputFailed() throws Exception {
@@ -1335,15 +1335,54 @@ public class TestTaskAttempt {
//This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is
within limits, as
// MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit.
taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent,
8));
- assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
- TaskAttemptState.FAILED);
-
- assertEquals("Task attempt is not in FAILED state", taImpl2.getState(),
+ assertEquals("Task attempt is not in failed state", taImpl2.getState(),
TaskAttemptState.FAILED);
assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST,
taImpl2.getTerminationCause());
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2);
+ Clock mockClock = mock(Clock.class);
+ int readErrorTimespanSec = 1;
+ taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10);
+
taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
readErrorTimespanSec);
+ TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3);
+ MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1,
eventHandler,
+ taListener, taskConf, mockClock,
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID3 = taImpl3.getID();
+
+ taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0));
+ // At state STARTING.
+ taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3, contId,
null));
+ verify(mockHeartbeatHandler).register(taskAttemptID3);
+ taImpl3.handle(new TaskAttemptEvent(taskAttemptID3,
TaskAttemptEventType.TA_DONE));
+ assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+ TaskAttemptState.SUCCEEDED);
+ verify(mockHeartbeatHandler).unregister(taskAttemptID3);
+
+ mockReEvent = InputReadErrorEvent.create("", 1, 1);
+ mockMeta = mock(EventMetaData.class);
+ mockDestId1 = mock(TezTaskAttemptID.class);
+ when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+ tzEvent = new TezEvent(mockReEvent, mockMeta);
+ when(mockClock.getTime()).thenReturn(1000L);
+ // time deadline not exceeded for a couple of read error events
+ taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent,
1000));
+ assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+ TaskAttemptState.SUCCEEDED);
+ when(mockClock.getTime()).thenReturn(1500L);
+ taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent,
1000));
+ assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+ TaskAttemptState.SUCCEEDED);
+ // exceed the time threshold
+ when(mockClock.getTime()).thenReturn(2001L);
+ taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent,
1000));
+ assertEquals("Task attempt is not in FAILED state", taImpl3.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST,
taImpl3.getTerminationCause());
+ // verify unregister is not invoked again
+ verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3);
}
@SuppressWarnings("deprecation")