Repository: hadoop Updated Branches: refs/heads/branch-2.7 a66d2a2a8 -> f0c278469
YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0c27846 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0c27846 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0c27846 Branch: refs/heads/branch-2.7 Commit: f0c278469bf4b1af10acb8378053367bd42ddd4f Parents: a66d2a2 Author: Eric Payne <[email protected]> Authored: Fri Mar 18 21:01:53 2016 +0000 Committer: Eric Payne <[email protected]> Committed: Fri Mar 18 21:01:53 2016 +0000 ---------------------------------------------------------------------- .../hadoop/yarn/client/ProtocolHATestBase.java | 3 +- .../hadoop/yarn/client/TestRMFailover.java | 2 - .../yarn/client/api/impl/TestYarnClient.java | 19 +++++ .../nodemanager/NodeStatusUpdaterImpl.java | 42 ++++++----- .../hadoop/yarn/server/MiniYARNCluster.java | 73 +++++++------------- .../yarn/server/TestMiniYARNClusterForHA.java | 4 -- 6 files changed, 71 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index f468bc1..a31699c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -237,7 +237,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected void verifyConnections() throws InterruptedException, YarnException { assertTrue("NMs failed to connect to the RM", - cluster.waitForNodeManagersToConnect(20000)); + cluster.waitForNodeManagersToConnect(5000)); verifyClientConnection(); } @@ -299,7 +299,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index d42d14a..a66edef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -162,7 +162,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); @@ -251,7 +250,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 738b0a8..2c57b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -1132,6 +1132,25 @@ public class TestYarnClient { try { cluster.init(conf); cluster.start(); + + int attempts; + for(attempts = 10; attempts > 0; attempts--) { + if (cluster.getResourceManager().getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity() + .getMemory() > 0) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (attempts <= 0) { + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); + } + final Configuration yarnConf = cluster.getConfig(); client = YarnClient.createYarnClient(); client.init(yarnConf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 795dcfa..d3b9c69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -83,6 +83,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); + private final Object shutdownMonitor = new Object(); private final Context context; private final Dispatcher dispatcher; @@ -205,27 +206,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override protected void serviceStop() throws Exception { - // Interrupt the updater. - this.isStopped = true; - stopRMProxy(); - super.serviceStop(); + synchronized(shutdownMonitor) { + // Interrupt the updater. + this.isStopped = true; + stopRMProxy(); + super.serviceStop(); + } } protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. - this.isStopped = true; - - try { - statusUpdater.join(); - registerWithRM(); - statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); - this.isStopped = false; - statusUpdater.start(); - LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); - } catch (Exception e) { - String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; - LOG.error(errorMessage, e); - throw new YarnRuntimeException(e); + synchronized(shutdownMonitor) { + if(this.isStopped) { + LOG.info("Currently being shutdown. Aborting reboot"); + return; + } + this.isStopped = true; + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); + this.isStopped = false; + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; + LOG.error(errorMessage, e); + throw new YarnRuntimeException(e); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 048c46f..d445932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -270,6 +273,12 @@ public class MiniYARNCluster extends CompositeService { conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } + @Override + protected synchronized void serviceStart() throws Exception { + super.serviceStart(); + this.waitForNodeManagersToConnect(5000); + } + private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { String hostname = MiniYARNCluster.getHostname(); conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); @@ -307,19 +316,7 @@ public class MiniYARNCluster extends CompositeService { private synchronized void startResourceManager(final int index) { try { - Thread rmThread = new Thread() { - public void run() { - resourceManagers[index].start(); - } - }; - rmThread.setName("RM-" + index); - rmThread.start(); - int waitCount = 0; - while (resourceManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for RM to start..."); - Thread.sleep(1500); - } + resourceManagers[index].start(); if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( @@ -448,6 +445,11 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { startResourceManager(index); + if(index == 0) { + resourceManagers[index].getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED)); + } LOG.info("MiniYARN ResourceManager address: " + getConfig().get(YarnConfiguration.RM_ADDRESS)); LOG.info("MiniYARN ResourceManager web address: " + @@ -555,26 +557,12 @@ public class MiniYARNCluster extends CompositeService { } protected synchronized void serviceStart() throws Exception { - try { - new Thread() { - public void run() { - nodeManagers[index].start(); - } - }.start(); - int waitCount = 0; - while (nodeManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for NM " + index + " to start..."); - Thread.sleep(1000); - } - if (nodeManagers[index].getServiceState() != STATE.STARTED) { - // RM could have failed. - throw new IOException("NodeManager " + index + " failed to start"); - } - super.serviceStart(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); + nodeManagers[index].start(); + if (nodeManagers[index].getServiceState() != STATE.STARTED) { + // NM could have failed. + throw new IOException("NodeManager " + index + " failed to start"); } + super.serviceStart(); } @Override @@ -650,7 +638,7 @@ public class MiniYARNCluster extends CompositeService { /** * Wait for all the NodeManagers to connect to the ResourceManager. * - * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. + * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds. * @return true if all NodeManagers connect to the (Active) * ResourceManager, false otherwise. * @throws YarnException @@ -659,16 +647,17 @@ public class MiniYARNCluster extends CompositeService { public boolean waitForNodeManagersToConnect(long timeout) throws YarnException, InterruptedException { GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - for (int i = 0; i < timeout / 100; i++) { + for (int i = 0; i < timeout / 10; i++) { ResourceManager rm = getResourceManager(); if (rm == null) { throw new YarnException("Can not find the active RM."); } else if (nodeManagers.length == rm.getClientRMService() - .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + LOG.info("All Node Managers connected in MiniYARNCluster"); return true; } - Thread.sleep(100); + Thread.sleep(10); } return false; } @@ -701,17 +690,7 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { try { - new Thread() { - public void run() { - appHistoryServer.start(); - }; - }.start(); - int waitCount = 0; - while (appHistoryServer.getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for Timeline Server to start..."); - Thread.sleep(1500); - } + appHistoryServer.start(); if (appHistoryServer.getServiceState() != STATE.STARTED) { // AHS could have failed. throw new IOException( http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c27846/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java index e84d62e..384d1cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA { cluster.init(conf); cluster.start(); - cluster.getResourceManager(0).getRMContext().getRMAdminService() - .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER)); - assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); }
