rbalamohan commented on a change in pull request #152:
URL: https://github.com/apache/tez/pull/152#discussion_r732419364



##########
File path: 
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal 
transition(TaskAttemptImpl attempt, TaskAttemptE
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
 
     @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
         TaskAttemptEvent event) {
       TaskAttemptEventOutputFailed outputFailedEvent = 
           (TaskAttemptEventOutputFailed) event;
-      TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
-      TezTaskAttemptID failedDestTaId = 
tezEvent.getSourceInfo().getTaskAttemptID();
-      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)tezEvent.getEvent();
+      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+      TezTaskAttemptID failedDestTaId = 
inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)inputFailedEvent.getEvent();
       int failedInputIndexOnDestTa = readErrorEvent.getIndex();
-      if (readErrorEvent.getVersion() != attempt.getID().getId()) {
-        throw new TezUncheckedException(attempt.getID()
+
+      if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+        throw new TezUncheckedException(sourceAttempt.getID()
             + " incorrectly blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa + " version"
             + readErrorEvent.getVersion());
       }
-      LOG.info(attempt.getID()
-            + " blamed for read error from " + failedDestTaId
-            + " at inputIndex " + failedInputIndexOnDestTa);
-      long time = attempt.clock.getTime();
-      Long firstErrReportTime = 
attempt.uniquefailedOutputReports.get(failedDestTaId);
+      // source host: where the data input is supposed to come from
+      String sHost = sourceAttempt.getNodeId().getHost();
+      // destination: where the data is tried to be fetched to
+      String dHost = readErrorEvent.getDestinationLocalhostName();
+
+      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex 
{}", sourceAttempt.getID(),
+          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+      Map<String, Set<String>> downstreamBlamingHosts = 
sourceAttempt.getVertex().getDownstreamBlamingHosts();
+      if (!downstreamBlamingHosts.containsKey(sHost)) {
+        LOG.info("Host {} is blamed for fetch failure from {} for the first 
time", sHost, dHost);
+        downstreamBlamingHosts.put(sHost, new HashSet<String>());
+      }
 
+      downstreamBlamingHosts.get(sHost).add(dHost);
+      int currentNumberOfFailingDownstreamHosts = 
downstreamBlamingHosts.get(sHost).size();
+      if (currentNumberOfFailingDownstreamHosts > 
sourceAttempt.getVertex().getVertexConfig()
+          .getMaxAllowedDownstreamHostsReportingFetchFailure()) {
+        LOG.info("Host will be marked fail: {} because of {} distinct upstream 
hosts having fetch failures", sHost,
+            currentNumberOfFailingDownstreamHosts);
+        tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
+      }
+
+      long time = sourceAttempt.clock.getTime();
+
+      Long firstErrReportTime = 
sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
       if (firstErrReportTime == null) {
-        attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+        sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
       }
 
-      int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+      int maxAllowedOutputFailures = 
sourceAttempt.getVertex().getVertexConfig()

Review comment:
       rest of the refactorings and additional code to pass the hostname in 
readerror event etc looks fine.

##########
File path: 
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal 
transition(TaskAttemptImpl attempt, TaskAttemptE
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
 
     @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
         TaskAttemptEvent event) {
       TaskAttemptEventOutputFailed outputFailedEvent = 
           (TaskAttemptEventOutputFailed) event;
-      TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
-      TezTaskAttemptID failedDestTaId = 
tezEvent.getSourceInfo().getTaskAttemptID();
-      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)tezEvent.getEvent();
+      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+      TezTaskAttemptID failedDestTaId = 
inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)inputFailedEvent.getEvent();
       int failedInputIndexOnDestTa = readErrorEvent.getIndex();
-      if (readErrorEvent.getVersion() != attempt.getID().getId()) {
-        throw new TezUncheckedException(attempt.getID()
+
+      if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+        throw new TezUncheckedException(sourceAttempt.getID()
             + " incorrectly blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa + " version"
             + readErrorEvent.getVersion());
       }
-      LOG.info(attempt.getID()
-            + " blamed for read error from " + failedDestTaId
-            + " at inputIndex " + failedInputIndexOnDestTa);
-      long time = attempt.clock.getTime();
-      Long firstErrReportTime = 
attempt.uniquefailedOutputReports.get(failedDestTaId);
+      // source host: where the data input is supposed to come from
+      String sHost = sourceAttempt.getNodeId().getHost();
+      // destination: where the data is tried to be fetched to
+      String dHost = readErrorEvent.getDestinationLocalhostName();
+
+      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex 
{}", sourceAttempt.getID(),
+          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+      Map<String, Set<String>> downstreamBlamingHosts = 
sourceAttempt.getVertex().getDownstreamBlamingHosts();

Review comment:
       Wouldn't downstreamBlamingHosts structure occupy lot of memory in AM in 
large clusters? E.g think of a job running at 1000 node scale and running with 
10 vertices. This will easily have 1000 tasks x 1000 nodes entries, which can 
lead to mem pressure on AM. Add number of vertices to this and it will be even 
more mem pressure.
   
   How about tracking the downstream hostnames in a set and use that as 
optional dimension in the computation? This way, even if multiple task attempts 
were running the same host, it will be accounted as single failure (note that 
currently in master branch, it is accounted as multiple times). 
   
   To be more specific, is it possible to track downstream hostnames in a set 
and use that set.size() for computing the fraction to determine if src has to 
be re-executed or not?

##########
File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
##########
@@ -300,6 +300,21 @@ public TezConfiguration(boolean loadDefaults) {
       TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
   public static final int 
TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;
 
+  /**
+   * int value. The maximum number of distinct downstream hosts that can 
report a fetch failure
+   * for a single upstream host before the upstream task attempt is marked as 
failed (so blamed for
+   * the fetch failure). E.g. if this set to 1, in case of 2 different hosts 
reporting fetch failure
+   * for the same upstream host the upstream task is immediately blamed for 
the fetch failure.
+   * TODO: could this be proportional to the number of hosts running 
consumer/downstream tasks ?
+   *
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String 
TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE =

Review comment:
       How do we determine this value for large cluster or dynamically changing 
cluster?

##########
File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
##########
@@ -264,6 +264,13 @@
 
   final ServicePluginInfo servicePluginInfo;
 
+  /*
+   * For every upstream host (as map keys) contains every unique downstream 
hostnames that reported INPUT_READ_ERROR.
+   * This map helps to decide if there is a problem with the host that 
produced the map outputs. There is an assumption
+   * that if multiple downstream hosts report input errors for the same 
upstream host, then it's likely that the output
+   * has to be blamed and needs to rerun.
+   */
+  private final Map<String, Set<String>> downstreamBlamingHosts = 
Maps.newHashMap();

Review comment:
       Please refer to previous comment on mem pressure in AM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to