abstractdog commented on a change in pull request #152:
URL: https://github.com/apache/tez/pull/152#discussion_r732485055



##########
File path: 
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal 
transition(TaskAttemptImpl attempt, TaskAttemptE
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
 
     @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
         TaskAttemptEvent event) {
       TaskAttemptEventOutputFailed outputFailedEvent = 
           (TaskAttemptEventOutputFailed) event;
-      TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
-      TezTaskAttemptID failedDestTaId = 
tezEvent.getSourceInfo().getTaskAttemptID();
-      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)tezEvent.getEvent();
+      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+      TezTaskAttemptID failedDestTaId = 
inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+      InputReadErrorEvent readErrorEvent = 
(InputReadErrorEvent)inputFailedEvent.getEvent();
       int failedInputIndexOnDestTa = readErrorEvent.getIndex();
-      if (readErrorEvent.getVersion() != attempt.getID().getId()) {
-        throw new TezUncheckedException(attempt.getID()
+
+      if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+        throw new TezUncheckedException(sourceAttempt.getID()
             + " incorrectly blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa + " version"
             + readErrorEvent.getVersion());
       }
-      LOG.info(attempt.getID()
-            + " blamed for read error from " + failedDestTaId
-            + " at inputIndex " + failedInputIndexOnDestTa);
-      long time = attempt.clock.getTime();
-      Long firstErrReportTime = 
attempt.uniquefailedOutputReports.get(failedDestTaId);
+      // source host: where the data input is supposed to come from
+      String sHost = sourceAttempt.getNodeId().getHost();
+      // destination: where the data is tried to be fetched to
+      String dHost = readErrorEvent.getDestinationLocalhostName();
+
+      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex 
{}", sourceAttempt.getID(),
+          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+      Map<String, Set<String>> downstreamBlamingHosts = 
sourceAttempt.getVertex().getDownstreamBlamingHosts();
+      if (!downstreamBlamingHosts.containsKey(sHost)) {
+        LOG.info("Host {} is blamed for fetch failure from {} for the first 
time", sHost, dHost);
+        downstreamBlamingHosts.put(sHost, new HashSet<String>());
+      }
 
+      downstreamBlamingHosts.get(sHost).add(dHost);
+      int currentNumberOfFailingDownstreamHosts = 
downstreamBlamingHosts.get(sHost).size();
+      if (currentNumberOfFailingDownstreamHosts > 
sourceAttempt.getVertex().getVertexConfig()
+          .getMaxAllowedDownstreamHostsReportingFetchFailure()) {
+        LOG.info("Host will be marked fail: {} because of {} distinct upstream 
hosts having fetch failures", sHost,
+            currentNumberOfFailingDownstreamHosts);
+        tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
+      }
+
+      long time = sourceAttempt.clock.getTime();
+
+      Long firstErrReportTime = 
sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
       if (firstErrReportTime == null) {
-        attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+        sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
       }
 
-      int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+      int maxAllowedOutputFailures = 
sourceAttempt.getVertex().getVertexConfig()

Review comment:
       thanks a lot, I'm addressing other issues then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to