YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9735afe9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9735afe9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9735afe9 Branch: refs/heads/HDFS-7240 Commit: 9735afe967a660f356e953348cb6c34417f41055 Parents: 9f53a95 Author: Anubhav Dhoot <adh...@apache.org> Authored: Mon Sep 28 15:30:17 2015 -0700 Committer: Anubhav Dhoot <adh...@apache.org> Committed: Mon Sep 28 16:13:41 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/amlauncher/AMLauncher.java | 27 ++++---- .../yarn/server/resourcemanager/MockRM.java | 12 ++-- .../TestApplicationMasterLauncher.java | 66 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3745d55..e9d04d3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -999,6 +999,9 @@ Release 2.7.2 - UNRELEASED YARN-3624. ApplicationHistoryServer should not reverse the order of the filters it gets. (Mit Desai via xgong) + YARN-4180. AMLauncher does not retry on failures when talking to NM. + (adhoot) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 713e75f..b1d8506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.amlauncher; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -151,10 +151,10 @@ public class AMLauncher implements Runnable { final ContainerId containerId) { final NodeId node = masterContainer.getNodeId(); - final InetSocketAddress containerManagerBindAddress = + final InetSocketAddress containerManagerConnectAddress = NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); - final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. + final YarnRPC rpc = getYarnRPC(); UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(containerId @@ -168,18 +168,15 @@ public class AMLauncher implements Runnable { rmContext.getNMTokenSecretManager().createNMToken( containerId.getApplicationAttemptId(), node, user); currentUser.addToken(ConverterUtils.convertFromYarn(token, - containerManagerBindAddress)); - - return currentUser - .doAs(new PrivilegedAction<ContainerManagementProtocol>() { - - @Override - public ContainerManagementProtocol run() { - return (ContainerManagementProtocol) rpc.getProxy( - ContainerManagementProtocol.class, - containerManagerBindAddress, conf); - } - }); + containerManagerConnectAddress)); + + return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, + currentUser, rpc, containerManagerConnectAddress); + } + + @VisibleForTesting + protected YarnRPC getYarnRPC() { + return YarnRPC.create(conf); // TODO: Don't create again and again. } private ContainerLaunchContext createAMContainerLaunchContext( http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7ce42f5..a066ba4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -160,14 +160,18 @@ public class MockRM extends ResourceManager { " for the application " + appId); } } - - public void waitForState(ApplicationAttemptId attemptId, - RMAppAttemptState finalState) + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws Exception { + waitForState(attemptId, finalState, 40000); + } + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) throws Exception { RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - final int timeoutMsecs = 40000; final int minWaitMsecs = 1000; final int waitMsPerLoop = 10; int loop = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9735afe9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 2760705..8fa88d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -49,14 +50,19 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -67,6 +73,10 @@ import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestApplicationMasterLauncher { private static final Log LOG = LogFactory @@ -193,6 +203,62 @@ public class TestApplicationMasterLauncher { rm.stop(); } + @Test + public void testRetriesOnFailures() throws Exception { + final ContainerManagementProtocol mockProxy = + mock(ContainerManagementProtocol.class); + final StartContainersResponse mockResponse = + mock(StartContainersResponse.class); + when(mockProxy.startContainers(any(StartContainersRequest.class))) + .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse); + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRMWithCustomAMLauncher(conf, null) { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(getRMContext()) { + @Override + protected Runnable createRunnableLauncher(RMAppAttempt application, + AMLauncherEventType event) { + return new AMLauncher(context, application, event, getConfig()) { + @Override + protected YarnRPC getYarnRPC() { + YarnRPC mockRpc = mock(YarnRPC.class); + + when(mockRpc.getProxy( + any(Class.class), + any(InetSocketAddress.class), + any(Configuration.class))) + .thenReturn(mockProxy); + return mockRpc; + } + }; + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120); + + RMApp app = rm.submitApp(2000); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + + // kick the scheduling + nm1.nodeHeartbeat(true); + dispatcher.await(); + + rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500); + } + + @SuppressWarnings("unused") @Test(timeout = 100000)