Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 fc10c9983 -> 7c9a368b4
YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)
(cherry picked from commit 9735afe967a660f356e953348cb6c34417f41055)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
(cherry picked from commit 22f2501476d987afb7bc19080a7a0db94ea72be6)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c9a368b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c9a368b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c9a368b
Branch: refs/heads/branch-2.7
Commit: 7c9a368b45b0e38173521a94ab32dee8a2984bf8
Parents: fc10c99
Author: Anubhav Dhoot <[email protected]>
Authored: Mon Sep 28 15:30:17 2015 -0700
Committer: Anubhav Dhoot <[email protected]>
Committed: Mon Sep 28 20:57:08 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/amlauncher/AMLauncher.java | 23 +++----
.../yarn/server/resourcemanager/MockRM.java | 13 ++--
.../TestApplicationMasterLauncher.java | 71 +++++++++++++++++++-
4 files changed, 91 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9a368b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a39face..a785bdd 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -85,6 +85,9 @@ Release 2.7.2 - UNRELEASED
YARN-3624. ApplicationHistoryServer should not reverse the order of the
filters it gets. (Mit Desai via xgong)
+ YARN-4180. AMLauncher does not retry on failures when talking to NM.
+ (adhoot)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9a368b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 0dd9ba1..f5ecbaa 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -21,7 +21,6 @@ package
org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -150,10 +150,10 @@ public class AMLauncher implements Runnable {
final ContainerId containerId) {
final NodeId node = masterContainer.getNodeId();
- final InetSocketAddress containerManagerBindAddress =
+ final InetSocketAddress containerManagerConnectAddress =
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
- final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and
again.
+ final YarnRPC rpc = getYarnRPC();
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(containerId
@@ -167,18 +167,15 @@ public class AMLauncher implements Runnable {
rmContext.getNMTokenSecretManager().createNMToken(
containerId.getApplicationAttemptId(), node, user);
currentUser.addToken(ConverterUtils.convertFromYarn(token,
- containerManagerBindAddress));
+ containerManagerConnectAddress));
- return currentUser
- .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+ return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
+ currentUser, rpc, containerManagerConnectAddress);
+ }
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(
- ContainerManagementProtocol.class,
- containerManagerBindAddress, conf);
- }
- });
+ @VisibleForTesting
+ protected YarnRPC getYarnRPC() {
+ return YarnRPC.create(conf); // TODO: Don't create again and again.
}
private ContainerLaunchContext createAMContainerLaunchContext(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9a368b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 9f7bd88..e27e6c5 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -147,15 +147,20 @@ public class MockRM extends ResourceManager {
Assert.assertEquals("App state is not correct (timedout)", finalState,
app.getState());
}
-
- public void waitForState(ApplicationAttemptId attemptId,
- RMAppAttemptState finalState)
+
+ public void waitForState(ApplicationAttemptId attemptId,
+ RMAppAttemptState finalState)
throws Exception {
+ waitForState(attemptId, finalState, 40000);
+ }
+
+ public void waitForState(ApplicationAttemptId attemptId,
+ RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
Assert.assertNotNull("app shouldn't be null", app);
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
int timeoutSecs = 0;
- while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ <
40) {
+ while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ <
timeoutMsecs) {
System.out.println("AppAttempt : " + attemptId
+ " State is : " + attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9a368b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 11cd1fd..e54caa4 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -43,11 +45,18 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
+import
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -58,6 +67,10 @@ import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestApplicationMasterLauncher {
private static final Log LOG = LogFactory
@@ -177,8 +190,62 @@ public class TestApplicationMasterLauncher {
am.waitForState(RMAppAttemptState.FINISHED);
rm.stop();
}
-
-
+
+ @Test
+ public void testRetriesOnFailures() throws Exception {
+ final ContainerManagementProtocol mockProxy =
+ mock(ContainerManagementProtocol.class);
+ final StartContainersResponse mockResponse =
+ mock(StartContainersResponse.class);
+ when(mockProxy.startContainers(any(StartContainersRequest.class)))
+ .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse);
+ Configuration conf = new Configuration();
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
+ @Override
+ protected ApplicationMasterLauncher createAMLauncher() {
+ return new ApplicationMasterLauncher(getRMContext()) {
+ @Override
+ protected Runnable createRunnableLauncher(RMAppAttempt application,
+ AMLauncherEventType event) {
+ return new AMLauncher(context, application, event, getConfig()) {
+ @Override
+ protected YarnRPC getYarnRPC() {
+ YarnRPC mockRpc = mock(YarnRPC.class);
+
+ when(mockRpc.getProxy(
+ any(Class.class),
+ any(InetSocketAddress.class),
+ any(Configuration.class)))
+ .thenReturn(mockProxy);
+ return mockRpc;
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm.start();
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
+
+ RMApp app = rm.submitApp(2000);
+ final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
+ }
+
@SuppressWarnings("unused")
@Test(timeout = 100000)
public void testallocateBeforeAMRegistration() throws Exception {