rbalamohan commented on a change in pull request #152:
URL: https://github.com/apache/tez/pull/152#discussion_r732419364
##########
File path:
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal
transition(TaskAttemptImpl attempt, TaskAttemptE
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@Override
- public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
TaskAttemptEvent event) {
TaskAttemptEventOutputFailed outputFailedEvent =
(TaskAttemptEventOutputFailed) event;
- TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
- TezTaskAttemptID failedDestTaId =
tezEvent.getSourceInfo().getTaskAttemptID();
- InputReadErrorEvent readErrorEvent =
(InputReadErrorEvent)tezEvent.getEvent();
+ TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+ TezTaskAttemptID failedDestTaId =
inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+ InputReadErrorEvent readErrorEvent =
(InputReadErrorEvent)inputFailedEvent.getEvent();
int failedInputIndexOnDestTa = readErrorEvent.getIndex();
- if (readErrorEvent.getVersion() != attempt.getID().getId()) {
- throw new TezUncheckedException(attempt.getID()
+
+ if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+ throw new TezUncheckedException(sourceAttempt.getID()
+ " incorrectly blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa + " version"
+ readErrorEvent.getVersion());
}
- LOG.info(attempt.getID()
- + " blamed for read error from " + failedDestTaId
- + " at inputIndex " + failedInputIndexOnDestTa);
- long time = attempt.clock.getTime();
- Long firstErrReportTime =
attempt.uniquefailedOutputReports.get(failedDestTaId);
+ // source host: where the data input is supposed to come from
+ String sHost = sourceAttempt.getNodeId().getHost();
+ // destination: where the data is tried to be fetched to
+ String dHost = readErrorEvent.getDestinationLocalhostName();
+
+ LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex
{}", sourceAttempt.getID(),
+ sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+ boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+ Map<String, Set<String>> downstreamBlamingHosts =
sourceAttempt.getVertex().getDownstreamBlamingHosts();
+ if (!downstreamBlamingHosts.containsKey(sHost)) {
+ LOG.info("Host {} is blamed for fetch failure from {} for the first
time", sHost, dHost);
+ downstreamBlamingHosts.put(sHost, new HashSet<String>());
+ }
+ downstreamBlamingHosts.get(sHost).add(dHost);
+ int currentNumberOfFailingDownstreamHosts =
downstreamBlamingHosts.get(sHost).size();
+ if (currentNumberOfFailingDownstreamHosts >
sourceAttempt.getVertex().getVertexConfig()
+ .getMaxAllowedDownstreamHostsReportingFetchFailure()) {
+ LOG.info("Host will be marked fail: {} because of {} distinct upstream
hosts having fetch failures", sHost,
+ currentNumberOfFailingDownstreamHosts);
+ tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
+ }
+
+ long time = sourceAttempt.clock.getTime();
+
+ Long firstErrReportTime =
sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
if (firstErrReportTime == null) {
- attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+ sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
firstErrReportTime = time;
}
- int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+ int maxAllowedOutputFailures =
sourceAttempt.getVertex().getVertexConfig()
Review comment:
rest of the refactorings and additional code to pass the hostname in
readerror event etc looks fine.
##########
File path:
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal
transition(TaskAttemptImpl attempt, TaskAttemptE
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@Override
- public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
TaskAttemptEvent event) {
TaskAttemptEventOutputFailed outputFailedEvent =
(TaskAttemptEventOutputFailed) event;
- TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
- TezTaskAttemptID failedDestTaId =
tezEvent.getSourceInfo().getTaskAttemptID();
- InputReadErrorEvent readErrorEvent =
(InputReadErrorEvent)tezEvent.getEvent();
+ TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+ TezTaskAttemptID failedDestTaId =
inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+ InputReadErrorEvent readErrorEvent =
(InputReadErrorEvent)inputFailedEvent.getEvent();
int failedInputIndexOnDestTa = readErrorEvent.getIndex();
- if (readErrorEvent.getVersion() != attempt.getID().getId()) {
- throw new TezUncheckedException(attempt.getID()
+
+ if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+ throw new TezUncheckedException(sourceAttempt.getID()
+ " incorrectly blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa + " version"
+ readErrorEvent.getVersion());
}
- LOG.info(attempt.getID()
- + " blamed for read error from " + failedDestTaId
- + " at inputIndex " + failedInputIndexOnDestTa);
- long time = attempt.clock.getTime();
- Long firstErrReportTime =
attempt.uniquefailedOutputReports.get(failedDestTaId);
+ // source host: where the data input is supposed to come from
+ String sHost = sourceAttempt.getNodeId().getHost();
+ // destination: where the data is tried to be fetched to
+ String dHost = readErrorEvent.getDestinationLocalhostName();
+
+ LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex
{}", sourceAttempt.getID(),
+ sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+ boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+ Map<String, Set<String>> downstreamBlamingHosts =
sourceAttempt.getVertex().getDownstreamBlamingHosts();
Review comment:
Wouldn't downstreamBlamingHosts structure occupy lot of memory in AM in
large clusters? E.g think of a job running at 1000 node scale and running with
10 vertices. This will easily have 1000 tasks x 1000 nodes entries, which can
lead to mem pressure on AM. Add number of vertices to this and it will be even
more mem pressure.
How about tracking the downstream hostnames in a set and use that as
optional dimension in the computation? This way, even if multiple task attempts
were running the same host, it will be accounted as single failure (note that
currently in master branch, it is accounted as multiple times).
To be more specific, is it possible to track downstream hostnames in a set
and use that set.size() for computing the fraction to determine if src has to
be re-executed or not?
##########
File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
##########
@@ -300,6 +300,21 @@ public TezConfiguration(boolean loadDefaults) {
TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
public static final int
TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;
+ /**
+ * int value. The maximum number of distinct downstream hosts that can
report a fetch failure
+ * for a single upstream host before the upstream task attempt is marked as
failed (so blamed for
+ * the fetch failure). E.g. if this set to 1, in case of 2 different hosts
reporting fetch failure
+ * for the same upstream host the upstream task is immediately blamed for
the fetch failure.
+ * TODO: could this be proportional to the number of hosts running
consumer/downstream tasks ?
+ *
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String
TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE =
Review comment:
How do we determine this value for large cluster or dynamically changing
cluster?
##########
File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
##########
@@ -264,6 +264,13 @@
final ServicePluginInfo servicePluginInfo;
+ /*
+ * For every upstream host (as map keys) contains every unique downstream
hostnames that reported INPUT_READ_ERROR.
+ * This map helps to decide if there is a problem with the host that
produced the map outputs. There is an assumption
+ * that if multiple downstream hosts report input errors for the same
upstream host, then it's likely that the output
+ * has to be blamed and needs to rerun.
+ */
+ private final Map<String, Set<String>> downstreamBlamingHosts =
Maps.newHashMap();
Review comment:
Please refer to previous comment on mem pressure in AM.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]