This is an automated email from the ASF dual-hosted git repository.

adamantal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 1c0fe2e  YARN-10393. MR job live lock caused by completed state 
container leak in heartbeat between node manager and RM. Contributed by 
zhenzhao wang and Jim Brennan
1c0fe2e is described below

commit 1c0fe2eb20d1755d40a4b5cef2d656c82405aebb
Author: Adam Antal <adam.an...@cloudera.com>
AuthorDate: Wed Oct 7 16:46:22 2020 +0200

    YARN-10393. MR job live lock caused by completed state container leak in 
heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim 
Brennan
---
 .../server/nodemanager/NodeStatusUpdaterImpl.java  | 20 ++++++++++++++++--
 .../server/nodemanager/TestNodeStatusUpdater.java  | 24 +++++++---------------
 2 files changed, 25 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 0b40dd5..09ad339 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -659,7 +659,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
   @VisibleForTesting
   @Private
   public void removeOrTrackCompletedContainersFromContext(
-      List<ContainerId> containerIds) throws IOException {
+      List<ContainerId> containerIds) {
     Set<ContainerId> removedContainers = new HashSet<ContainerId>();
 
     pendingContainersToRemove.addAll(containerIds);
@@ -676,13 +676,13 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
         removedContainers.add(containerId);
         iter.remove();
       }
+      pendingCompletedContainers.remove(containerId);
     }
 
     if (!removedContainers.isEmpty()) {
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);
     }
-    pendingCompletedContainers.clear();
   }
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -790,6 +790,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       @SuppressWarnings("unchecked")
       public void run() {
         int lastHeartbeatID = 0;
+        boolean missedHearbeat = false;
         while (!isStopped) {
           // Send heartbeat
           try {
@@ -836,6 +837,20 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
               removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
+              // If the last heartbeat was missed, it is possible that the
+              // RM saw this one as a duplicate and did not process it.
+              // If so, we can fail to notify the RM of these completed 
containers
+              // on the next heartbeat if we clear pendingCompletedContainers.
+              // If it wasn't a duplicate, the only impact is we might notify
+              // the RM twice, which it can handle.
+              if (!missedHearbeat) {
+                pendingCompletedContainers.clear();
+              } else {
+                LOG.info("skipped clearing pending completed containers due to 
" +
+                    "missed heartbeat");
+                missedHearbeat = false;
+              }
+
               logAggregationReportForAppsTempList.clear();
               lastHeartbeatID = response.getResponseId();
               List<ContainerId> containersToCleanup = response
@@ -911,6 +926,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);
+            missedHearbeat = true;
           } finally {
             synchronized (heartbeatMonitor) {
               nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 0d73a72..fb45144 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -692,15 +692,11 @@ public class TestNodeStatusUpdater extends 
NodeManagerTestBase {
         } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
-          if (heartBeatID == 2) {
-            // NM should send completed containers again, since the last
-            // heartbeat is lost.
-            Assert.assertEquals(4, statuses.size());
-          } else {
-            // NM should not send completed containers again, since the last
-            // heartbeat is successful.
-            Assert.assertEquals(2, statuses.size());
-          }
+          // NM should send completed containers on heartbeat 2,
+          // since heartbeat 1 was lost.  It will send them again on
+          // heartbeat 3, because it does not clear them if the previous
+          // heartbeat was lost in case the RM treated it as a duplicate.
+          Assert.assertEquals(4, statuses.size());
           Assert.assertEquals(4, context.getContainers().size());
 
           boolean container2Exist = false, container3Exist = false,
@@ -731,14 +727,8 @@ public class TestNodeStatusUpdater extends 
NodeManagerTestBase {
               container5Exist = true;
             }
           }
-          if (heartBeatID == 2) {
-            Assert.assertTrue(container2Exist && container3Exist
-                && container4Exist && container5Exist);
-          } else {
-            // NM do not send completed containers again
-            Assert.assertTrue(container2Exist && !container3Exist
-                && container4Exist && !container5Exist);
-          }
+          Assert.assertTrue(container2Exist && container3Exist
+              && container4Exist && container5Exist);
 
           if (heartBeatID == 3) {
             
finishedContainersPulledByAM.add(containerStatus3.getContainerId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to