tomicooler commented on code in PR #5317:
URL: https://github.com/apache/hadoop/pull/5317#discussion_r1366752837


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -60,53 +55,50 @@
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.function.ThrowingRunnable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestNMClient {
-  Configuration conf = null;
-  MiniYARNCluster yarnCluster = null;
-  YarnClientImpl yarnClient = null;
-  AMRMClientImpl<ContainerRequest> rmClient = null;
-  NMClientImpl nmClient = null;
-  List<NodeReport> nodeReports = null;
-  ApplicationAttemptId attemptId = null;
-  int nodeCount = 3;
-  NMTokenCache nmTokenCache = null;
+  private static final String IS_NOT_HANDLED_BY_THIS_NODEMANAGER =
+      "is not handled by this NodeManager";
+  private static final String UNKNOWN_NODEMANAGER =
+      "Unknown container";
+
+  private static final int MAX_EARLY_FINISH = 3;
+  private static final int NUMBER_OF_CONTAINERS = 5;
+  private Configuration conf;
+  private MiniYARNCluster yarnCluster;
+  private YarnClientImpl yarnClient;
+  private AMRMClientImpl<ContainerRequest> rmClient;
+  private NMClientImpl nmClient;
+  private List<NodeReport> nodeReports;
+  private NMTokenCache nmTokenCache;
+  private RMAppAttempt appAttempt;
 
   /**
    * Container State transition listener to track the number of times
    * a container has transitioned into a state.
    */
-  public static class DebugSumContainerStateListener
-      implements ContainerStateTransitionListener {
-
-    private static final Logger LOG =
-        LoggerFactory.getLogger(DebugSumContainerStateListener.class);
-    private static final Map<ContainerId,
-        Map<org.apache.hadoop.yarn.server.nodemanager.containermanager
-            .container.ContainerState, Long>>
-        TRANSITION_COUNTER = new HashMap<>();
+  public static class DebugSumContainerStateListener implements 
ContainerStateTransitionListener {
+    public static final Map<ContainerId, Integer> RUNNING_TRANSITIONS = new 
HashMap<>();

Review Comment:
   The usage of this was in `synchronized` before, well only for the write 
operation. I think we could use a concurrent hash map.
   
   There are two threads:
   ```
   postTransaction Thread[NM ContainerManager dispatcher,5,main]
   waitForContainerRunningTransitionCount Thread[Time-limited test,5,main]
   ```



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -125,576 +117,344 @@ public void postTransition(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+        afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
 
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
-    yarnCluster.stop();
-  }
-
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
+    yarnCluster.asyncStop(this);
   }
 
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000 * MAX_EARLY_FINISH)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      stopNmClient();
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainers();
+      assertEquals(0, nmClient.startedContainers.size());
+    });
+  }
 
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+  @Test (timeout = 200_000 * MAX_EARLY_FINISH)
+  public void testNMClient()
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      // stop the running containers on close
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainersOnStop(true);
+      assertTrue(nmClient.getCleanupRunningContainers().get());
+      nmClient.stop();
+    });
+  }
 
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+  public void runTest(
+      Runnable test
+  ) throws IOException, InterruptedException, YarnException, TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    int earlyFinishCounterWhenTestWasStarted;
+    do {
+      earlyFinishCounterWhenTestWasStarted = earlyFinishCounter;
+      setup();
+      rmClient.registerApplicationMaster("Host", 10_000, "");
+      testContainerManagement(nmClient, allocateContainers(rmClient));
+      rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+      test.run();
+      tearDown();
+    } while (earlyFinishCounter != 0 && earlyFinishCounter != 
earlyFinishCounterWhenTestWasStarted);
+    if (earlyFinishCounter == 0) {

Review Comment:
   Condition 'earlyFinishCounter == 0' is always 'false'



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -125,576 +117,344 @@ public void postTransition(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+        afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
 
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
-    yarnCluster.stop();
-  }
-
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
+    yarnCluster.asyncStop(this);
   }
 
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000 * MAX_EARLY_FINISH)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      stopNmClient();
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainers();
+      assertEquals(0, nmClient.startedContainers.size());
+    });
+  }
 
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+  @Test (timeout = 200_000 * MAX_EARLY_FINISH)
+  public void testNMClient()
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      // stop the running containers on close
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainersOnStop(true);
+      assertTrue(nmClient.getCleanupRunningContainers().get());
+      nmClient.stop();
+    });
+  }
 
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+  public void runTest(
+      Runnable test
+  ) throws IOException, InterruptedException, YarnException, TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    int earlyFinishCounterWhenTestWasStarted;
+    do {
+      earlyFinishCounterWhenTestWasStarted = earlyFinishCounter;
+      setup();
+      rmClient.registerApplicationMaster("Host", 10_000, "");
+      testContainerManagement(nmClient, allocateContainers(rmClient));
+      rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+      test.run();
+      tearDown();
+    } while (earlyFinishCounter != 0 && earlyFinishCounter != 
earlyFinishCounterWhenTestWasStarted);

Review Comment:
   Condition 'earlyFinishCounter != 0' is always 'true' 
   Condition 'earlyFinishCounter != earlyFinishCounterWhenTestWasStarted' is 
always 'false' when reached



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -60,53 +55,50 @@
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.function.ThrowingRunnable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestNMClient {
-  Configuration conf = null;
-  MiniYARNCluster yarnCluster = null;
-  YarnClientImpl yarnClient = null;
-  AMRMClientImpl<ContainerRequest> rmClient = null;
-  NMClientImpl nmClient = null;
-  List<NodeReport> nodeReports = null;
-  ApplicationAttemptId attemptId = null;
-  int nodeCount = 3;
-  NMTokenCache nmTokenCache = null;
+  private static final String IS_NOT_HANDLED_BY_THIS_NODEMANAGER =
+      "is not handled by this NodeManager";
+  private static final String UNKNOWN_NODEMANAGER =

Review Comment:
   UNKNOWN_CONTAINER ?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -125,576 +117,344 @@ public void postTransition(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+        afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
 
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
-    yarnCluster.stop();
-  }
-
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
+    yarnCluster.asyncStop(this);
   }
 
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000 * MAX_EARLY_FINISH)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      stopNmClient();
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainers();
+      assertEquals(0, nmClient.startedContainers.size());
+    });
+  }
 
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+  @Test (timeout = 200_000 * MAX_EARLY_FINISH)
+  public void testNMClient()
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      // stop the running containers on close
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainersOnStop(true);
+      assertTrue(nmClient.getCleanupRunningContainers().get());
+      nmClient.stop();
+    });
+  }
 
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+  public void runTest(
+      Runnable test
+  ) throws IOException, InterruptedException, YarnException, TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    int earlyFinishCounterWhenTestWasStarted;
+    do {
+      earlyFinishCounterWhenTestWasStarted = earlyFinishCounter;
+      setup();
+      rmClient.registerApplicationMaster("Host", 10_000, "");
+      testContainerManagement(nmClient, allocateContainers(rmClient));
+      rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+      test.run();
+      tearDown();
+    } while (earlyFinishCounter != 0 && earlyFinishCounter != 
earlyFinishCounterWhenTestWasStarted);

Review Comment:
   Looks like this earlyFinishCounter was added then removed.
   
   Here is a patch with some of my review comment fixes: 
https://github.com/tomicooler/hadoop/commit/cc49a495178dbc25e7a896dc2c2f5f9b3ad67e1c



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java:
##########
@@ -456,6 +456,19 @@ public int getActiveRMIndex() {
     return -1;
   }
 
+  /**
+   * Stopping MiniYarnCluster can take more than 30 seconds and it's a 
blocking operation so it blocks test execution,
+   * so to prevent that we can stop the cluster asynchronously,
+   * that can ultimately save test execution time.
+   * @param owner the Test class that owns the instance of MiniYarnCluster
+   */
+  public void asyncStop(Object owner) {

Review Comment:
   Not sure how future proof is this, I mean 100 small test functions where 
each complete in <100ms and 100 thread will be created for stopping each mini 
cluster.
   
   Also the parameter owner is basically just an additional context (string) 
for the thread name, it's kind of misleading.
   
   I think we could leave this as a sync operation for these two test methods.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -125,576 +117,344 @@ public void postTransition(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+        afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
 
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
-    yarnCluster.stop();
-  }
-
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
+    yarnCluster.asyncStop(this);
   }
 
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000 * MAX_EARLY_FINISH)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      stopNmClient();
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainers();
+      assertEquals(0, nmClient.startedContainers.size());
+    });
+  }
 
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+  @Test (timeout = 200_000 * MAX_EARLY_FINISH)
+  public void testNMClient()
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      // stop the running containers on close
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainersOnStop(true);
+      assertTrue(nmClient.getCleanupRunningContainers().get());
+      nmClient.stop();
+    });
+  }
 
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+  public void runTest(
+      Runnable test
+  ) throws IOException, InterruptedException, YarnException, TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    int earlyFinishCounterWhenTestWasStarted;
+    do {
+      earlyFinishCounterWhenTestWasStarted = earlyFinishCounter;
+      setup();
+      rmClient.registerApplicationMaster("Host", 10_000, "");
+      testContainerManagement(nmClient, allocateContainers(rmClient));
+      rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+      test.run();
+      tearDown();
+    } while (earlyFinishCounter != 0 && earlyFinishCounter != 
earlyFinishCounterWhenTestWasStarted);
+    if (earlyFinishCounter == 0) {
+      fail("Too many early finish exception happened");
+    }
   }
 
-  @Test (timeout = 200000)
-  public void testNMClient()
-      throws YarnException, IOException {
-    rmClient.registerApplicationMaster("Host", 10000, "");
-
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-    
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // stop the running containers on close
-    assertFalse(nmClient.startedContainers.isEmpty());
-    nmClient.cleanupRunningContainersOnStop(true);
+  private void stopNmClient() {
+    assertNotNull("Null nmClient", nmClient);
+    // leave one unclosed
+    assertEquals(1, nmClient.startedContainers.size());
+    // default true
     assertTrue(nmClient.getCleanupRunningContainers().get());
+    nmClient.cleanupRunningContainersOnStop(false);
+    assertFalse(nmClient.getCleanupRunningContainers().get());
     nmClient.stop();
   }
 
   private Set<Container> allocateContainers(
-      AMRMClientImpl<ContainerRequest> rmClient, int num)
-      throws YarnException, IOException {
-    // setup container request
-    Resource capability = Resource.newInstance(1024, 0);
-    Priority priority = Priority.newInstance(0);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    String[] nodes = new String[] {node};
-    String[] racks = new String[] {rack};
-
-    for (int i = 0; i < num; ++i) {
-      rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
-          racks, priority));
+      AMRMClientImpl<ContainerRequest> rmClient
+  ) throws YarnException, IOException {
+    for (int i = 0; i < NUMBER_OF_CONTAINERS; ++i) {
+      rmClient.addContainerRequest(new ContainerRequest(
+          Resource.newInstance(1024, 0),
+          new String[] {nodeReports.get(0).getNodeId().getHost()},
+          new String[] {nodeReports.get(0).getRackName()},
+          Priority.newInstance(0)
+      ));
     }
-
-    int containersRequestedAny = rmClient.getTable(0)
-        .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
-            capability).remoteRequest.getNumContainers();
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    int iterationsLeft = 2;
-    Set<Container> containers = new TreeSet<Container>();
-    while (allocatedContainerCount < containersRequestedAny
-        && iterationsLeft > 0) {
+    Set<Container> allocatedContainers = new TreeSet<>();
+    while (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
       AllocateResponse allocResponse = rmClient.allocate(0.1f);
-
-      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
-      for(Container container : allocResponse.getAllocatedContainers()) {
-        containers.add(container);
-      }
-      if (!allocResponse.getNMTokens().isEmpty()) {
-        for (NMToken token : allocResponse.getNMTokens()) {
-          rmClient.getNMTokenCache().setToken(token.getNodeId().toString(),
-              token.getToken());
-        }
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      for (NMToken token : allocResponse.getNMTokens()) {
+        rmClient.getNMTokenCache().setToken(token.getNodeId().toString(), 
token.getToken());
       }
-      if(allocatedContainerCount < containersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(1000);
+      if (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
+        sleep(100);
       }
-
-      --iterationsLeft;
     }
-    return containers;
+    return allocatedContainers;
   }
 
-  private void testContainerManagement(NMClientImpl client,
-      Set<Container> containers) throws YarnException, IOException {
+  private void testContainerManagement(
+      NMClientImpl client, Set<Container> containers
+  ) throws YarnException, IOException {
     int size = containers.size();
     int i = 0;
     for (Container container : containers) {
       // getContainerStatus shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.getContainerStatus(container.getId(), container.getNodeId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("is not handled by this NodeManager"));
-      }
+      assertYarnException(
+          () -> client.getContainerStatus(container.getId(), 
container.getNodeId()),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
       // upadateContainerResource shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.updateContainerResource(container);
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("is not handled by this NodeManager"));
-      }
-
+      assertYarnException(
+          () -> client.updateContainerResource(container),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
       // restart shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.restartContainer(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.restartContainer(container.getId()),
+          UNKNOWN_NODEMANAGER);
       // rollback shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.rollbackLastReInitialization(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.rollbackLastReInitialization(container.getId()),
+          UNKNOWN_NODEMANAGER);
       // commit shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.commitLastReInitialization(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.commitLastReInitialization(container.getId()),
+          UNKNOWN_NODEMANAGER);
       // stopContainer shouldn't be called before startContainer,
       // otherwise, an exception will be thrown
-      try {
-        client.stopContainer(container.getId(), container.getNodeId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        if (!e.getMessage()
-              .contains("is not handled by this NodeManager")) {
-          throw new AssertionError("Exception is not expected: ", e);
-        }
-      }
+      assertYarnException(
+          () -> client.stopContainer(container.getId(), container.getNodeId()),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
 
       Credentials ts = new Credentials();
       DataOutputBuffer dob = new DataOutputBuffer();
       ts.writeTokenStorageToStream(dob);
-      ByteBuffer securityTokens =
-          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-      ContainerLaunchContext clc =
-          Records.newRecord(ContainerLaunchContext.class);
-      if (Shell.WINDOWS) {
-        clc.setCommands(
-            Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul"));
-      } else {
-        clc.setCommands(Arrays.asList("sleep", "1000000"));
-      }
+      ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
+      ContainerLaunchContext clc = 
Records.newRecord(ContainerLaunchContext.class);
+      clc.setCommands(Shell.WINDOWS
+          ? Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")
+          : Arrays.asList("sleep", "1000000")
+      );
       clc.setTokens(securityTokens);
-      try {
-        client.startContainer(container, clc);
-      } catch (YarnException e) {
-        throw new AssertionError("Exception is not expected ", e);
-      }
-      List<Integer> exitStatuses = Collections.singletonList(-1000);
+      client.startContainer(container, clc);
+      List<Integer> exitStatuses = Arrays.asList(-1000, -105);
 
       // leave one container unclosed
       if (++i < size) {
         testContainer(client, i, container, clc, exitStatuses);
-
       }
     }
   }
 
   private void testContainer(NMClientImpl client, int i, Container container,
                              ContainerLaunchContext clc, List<Integer> 
exitCode)
-      throws YarnException, IOException {
-    // NodeManager may still need some time to make the container started
+          throws YarnException, IOException {
     testGetContainerStatus(container, i, ContainerState.RUNNING, "",
-        exitCode);
-    waitForContainerTransitionCount(container,
-        org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING, 1);
-    // Test increase container API and make sure requests can reach NM
+            exitCode);
+    waitForContainerRunningTransitionCount(container, 1);
     testIncreaseContainerResource(container);
-
-    testRestartContainer(container.getId());
+    testRestartContainer(container);
     testGetContainerStatus(container, i, ContainerState.RUNNING,
-        "will be Restarted", exitCode);
-    waitForContainerTransitionCount(container,
-        org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING, 2);
-
+            "will be Restarted", exitCode);
+    waitForContainerRunningTransitionCount(container, 2);
     if (i % 2 == 0) {
-      testReInitializeContainer(container.getId(), clc, false);
+      testReInitializeContainer(container, clc, false);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 3);
-
-      testRollbackContainer(container.getId(), false);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 3);
+      testContainerRollback(container, true);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Rolled-back", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 4);
-
-      testCommitContainer(container.getId(), true);
-      testReInitializeContainer(container.getId(), clc, false);
+              "will be Rolled-back", exitCode);
+      waitForContainerRunningTransitionCount(container, 4);
+      testContainerCommit(container, false);
+      testReInitializeContainer(container, clc, false);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 5);
-      testCommitContainer(container.getId(), false);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 5);
+      testContainerCommit(container, true);
     } else {
-      testReInitializeContainer(container.getId(), clc, true);
+      testReInitializeContainer(container, clc, true);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 3);
-      testRollbackContainer(container.getId(), true);
-      testCommitContainer(container.getId(), true);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 3);
+      testContainerRollback(container, false);
+      testContainerCommit(container, false);
     }
-
-    try {
-      client.stopContainer(container.getId(), container.getNodeId());
-    } catch (YarnException e) {
-      throw (AssertionError)
-        (new AssertionError("Exception is not expected: " + e, e));
-    }
-
-    // getContainerStatus can be called after stopContainer
-    try {
-      // O is possible if CLEANUP_CONTAINER is executed too late
-      // -105 is possible if the container is not terminated but killed
-      testGetContainerStatus(container, i, ContainerState.COMPLETE,
-          "Container killed by the ApplicationMaster.",
-          Arrays.asList(
-              ContainerExitStatus.KILLED_BY_APPMASTER,
-              ContainerExitStatus.SUCCESS));
-    } catch (YarnException e) {
-      // The exception is possible because, after the container is stopped,
-      // it may be removed from NM's context.
-      if (!e.getMessage()
-            .contains("was recently stopped on node manager")) {
-        throw (AssertionError)
-          (new AssertionError("Exception is not expected: ", e));
-      }
-    }
-  }
-
-  /**
-   * Wait until the container reaches a state N times.
-   * @param container container to watch
-   * @param state state to test
-   * @param transitions the number N above
-   * @throws YarnException This happens if the test times out while waiting
-   */
-  private void waitForContainerTransitionCount(
-      Container container,
-      org.apache.hadoop.yarn.server.nodemanager.
-          containermanager.container.ContainerState state, long transitions)
-      throws YarnException {
-    long transitionCount = -1;
-    do {
-      if (transitionCount != -1) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          throw new YarnException(
-              "Timeout at transition count:" + transitionCount, e);
-        }
-      }
-      transitionCount = DebugSumContainerStateListener
-          .getTransitionCounter(container.getId(), state);
-    } while (transitionCount != transitions);
+    client.stopContainer(container.getId(), container.getNodeId());
+    testGetContainerStatus(container, i, ContainerState.COMPLETE,
+            "killed by the ApplicationMaster", exitCode);
   }
 
-  private void sleep(int sleepTime) {
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+  private void waitForContainerRunningTransitionCount(Container container, 
long transitions) {
+    while (DebugSumContainerStateListener.RUNNING_TRANSITIONS

Review Comment:
   ```
   The TestNMClient test methods can stuck if the test container fails, while 
the test is expecting it running state. This can happen for example if the 
container fails due low memory. To fix this the test should tolerate some 
failure like this.
   ```
   
   This was the Jira description (could not reproduce the issue with 4 runs). 
This still goes forever, well the test methods have a Timeout anyway, so I 
don't know what "stuck" means.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to