YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9735afe9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9735afe9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9735afe9

Branch: refs/heads/HDFS-7240
Commit: 9735afe967a660f356e953348cb6c34417f41055
Parents: 9f53a95
Author: Anubhav Dhoot <adh...@apache.org>
Authored: Mon Sep 28 15:30:17 2015 -0700
Committer: Anubhav Dhoot <adh...@apache.org>
Committed: Mon Sep 28 16:13:41 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../resourcemanager/amlauncher/AMLauncher.java  | 27 ++++----
 .../yarn/server/resourcemanager/MockRM.java     | 12 ++--
 .../TestApplicationMasterLauncher.java          | 66 ++++++++++++++++++++
 4 files changed, 89 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3745d55..e9d04d3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -999,6 +999,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3624. ApplicationHistoryServer should not reverse the order of the
     filters it gets. (Mit Desai via xgong)
 
+    YARN-4180. AMLauncher does not retry on failures when talking to NM.
+    (adhoot)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 713e75f..b1d8506 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -21,7 +21,6 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -151,10 +151,10 @@ public class AMLauncher implements Runnable {
       final ContainerId containerId) {
 
     final NodeId node = masterContainer.getNodeId();
-    final InetSocketAddress containerManagerBindAddress =
+    final InetSocketAddress containerManagerConnectAddress =
         NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
 
-    final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and 
again.
+    final YarnRPC rpc = getYarnRPC();
 
     UserGroupInformation currentUser =
         UserGroupInformation.createRemoteUser(containerId
@@ -168,18 +168,15 @@ public class AMLauncher implements Runnable {
         rmContext.getNMTokenSecretManager().createNMToken(
             containerId.getApplicationAttemptId(), node, user);
     currentUser.addToken(ConverterUtils.convertFromYarn(token,
-        containerManagerBindAddress));
-
-    return currentUser
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(
-                ContainerManagementProtocol.class,
-                containerManagerBindAddress, conf);
-          }
-        });
+        containerManagerConnectAddress));
+
+    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
+        currentUser, rpc, containerManagerConnectAddress);
+  }
+
+  @VisibleForTesting
+  protected YarnRPC getYarnRPC() {
+    return YarnRPC.create(conf);  // TODO: Don't create again and again.
   }
 
   private ContainerLaunchContext createAMContainerLaunchContext(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 7ce42f5..a066ba4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -160,14 +160,18 @@ public class MockRM extends ResourceManager {
           " for the application " + appId);
     }
   }
-  
-  public void waitForState(ApplicationAttemptId attemptId, 
-                           RMAppAttemptState finalState)
+
+  public void waitForState(ApplicationAttemptId attemptId,
+      RMAppAttemptState finalState)
       throws Exception {
+    waitForState(attemptId, finalState, 40000);
+  }
+
+  public void waitForState(ApplicationAttemptId attemptId,
+      RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
     RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
     Assert.assertNotNull("app shouldn't be null", app);
     RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
-    final int timeoutMsecs = 40000;
     final int minWaitMsecs = 1000;
     final int waitMsPerLoop = 10;
     int loop = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 2760705..8fa88d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,14 +50,19 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import 
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -67,6 +73,10 @@ import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestApplicationMasterLauncher {
 
   private static final Log LOG = LogFactory
@@ -193,6 +203,62 @@ public class TestApplicationMasterLauncher {
     rm.stop();
   }
 
+  @Test
+  public void testRetriesOnFailures() throws Exception {
+    final ContainerManagementProtocol mockProxy =
+        mock(ContainerManagementProtocol.class);
+    final StartContainersResponse mockResponse =
+        mock(StartContainersResponse.class);
+    when(mockProxy.startContainers(any(StartContainersRequest.class)))
+        .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse);
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
+      @Override
+      protected ApplicationMasterLauncher createAMLauncher() {
+        return new ApplicationMasterLauncher(getRMContext()) {
+          @Override
+          protected Runnable createRunnableLauncher(RMAppAttempt application,
+              AMLauncherEventType event) {
+            return new AMLauncher(context, application, event, getConfig()) {
+              @Override
+              protected YarnRPC getYarnRPC() {
+                YarnRPC mockRpc = mock(YarnRPC.class);
+
+                when(mockRpc.getProxy(
+                    any(Class.class),
+                    any(InetSocketAddress.class),
+                    any(Configuration.class)))
+                    .thenReturn(mockProxy);
+                return mockRpc;
+              }
+            };
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
+
+    RMApp app = rm.submitApp(2000);
+    final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
+  }
+
+
 
   @SuppressWarnings("unused")
   @Test(timeout = 100000)

Reply via email to