zhengchenyu commented on code in PR #5975:
URL: https://github.com/apache/hadoop/pull/5975#discussion_r1314400717
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java:
##########
@@ -142,14 +156,124 @@ protected void testUAMRestart(boolean keepContainers)
throws Exception {
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
- conts = am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
- conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ rm.stop();
+ }
+
+ protected void testUAMRestartWithoutTransferContainer(boolean
keepContainers) throws Exception {
+ // start RM
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNM nm =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm.registerNode();
+ Set<NodeId> tokenCacheClientSide = new HashSet();
+
+ // create app and launch the UAM
+ boolean unamanged = true;
+ int maxAttempts = 1;
+ boolean waitForAccepted = true;
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
+ .withAppName("")
+ .withUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ .withAcls(null)
+ .withUnmanagedAM(unamanged)
+ .withQueue(null)
+ .withMaxAppAttempts(maxAttempts)
+ .withCredentials(null)
+ .withAppType(null)
+ .withWaitForAppAcceptedState(waitForAccepted)
+ .withKeepContainers(keepContainers)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+ MockAM am = MockRM.launchUAM(app, rm, nm);
+
+ // Register for the first time
+ am.registerAppAttempt();
+
+ // Allocate two containers to UAM
+ int numContainers = 3;
+ AllocateResponse allocateResponse =
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ List<Container> conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ // Release all containers, then there are no transfer containfer app
attempt
+ List<ContainerId> releaseList = new ArrayList();
+ releaseList.add(conts.get(0).getId());
+ releaseList.add(conts.get(1).getId());
+ releaseList.add(conts.get(2).getId());
+ List<ContainerStatus> finishedConts =
+ am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+ .getCompletedContainersStatuses();
+ while (finishedConts.size() < releaseList.size()) {
+ nm.nodeHeartbeat(true);
+ finishedConts
+ .addAll(am
+ .allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>())
+ .getCompletedContainersStatuses());
+ Thread.sleep(100);
+ }
+
+ // Register for the second time
+ RegisterApplicationMasterResponse response = null;
+ try {
+ response = am.registerAppAttempt(false);
+ // When AM restart, it means nmToken in client side should be missing
+ tokenCacheClientSide.clear();
+ response.getNMTokensFromPreviousAttempts()
+ .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
+ } catch (InvalidApplicationMasterRequestException e) {
+ Assert.assertEquals(false, keepContainers);
+ return;
+ }
+ Assert.assertEquals("RM should not allow second register"
+ + " for UAM without keep container flag ", true, keepContainers);
+
+ // Expecting the zero running containers previously
+ Assert.assertEquals(0,
response.getContainersFromPreviousAttempts().size());
+ Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
+
+ // Allocate one more containers to UAM, just to be safe
+ numContainers = 1;
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ nm.nodeHeartbeat(true);
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
Review Comment:
I think there are no need to close. Because no resource are allocated, just
a mock object in memory.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java:
##########
@@ -142,14 +156,124 @@ protected void testUAMRestart(boolean keepContainers)
throws Exception {
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
- conts = am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
- conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ rm.stop();
+ }
+
+ protected void testUAMRestartWithoutTransferContainer(boolean
keepContainers) throws Exception {
+ // start RM
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNM nm =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm.registerNode();
+ Set<NodeId> tokenCacheClientSide = new HashSet();
+
+ // create app and launch the UAM
+ boolean unamanged = true;
+ int maxAttempts = 1;
+ boolean waitForAccepted = true;
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
+ .withAppName("")
+ .withUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ .withAcls(null)
+ .withUnmanagedAM(unamanged)
+ .withQueue(null)
+ .withMaxAppAttempts(maxAttempts)
+ .withCredentials(null)
+ .withAppType(null)
+ .withWaitForAppAcceptedState(waitForAccepted)
+ .withKeepContainers(keepContainers)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+ MockAM am = MockRM.launchUAM(app, rm, nm);
+
+ // Register for the first time
+ am.registerAppAttempt();
+
+ // Allocate two containers to UAM
+ int numContainers = 3;
+ AllocateResponse allocateResponse =
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ List<Container> conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ // Release all containers, then there are no transfer containfer app
attempt
+ List<ContainerId> releaseList = new ArrayList();
+ releaseList.add(conts.get(0).getId());
+ releaseList.add(conts.get(1).getId());
+ releaseList.add(conts.get(2).getId());
+ List<ContainerStatus> finishedConts =
+ am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+ .getCompletedContainersStatuses();
+ while (finishedConts.size() < releaseList.size()) {
+ nm.nodeHeartbeat(true);
+ finishedConts
+ .addAll(am
+ .allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>())
+ .getCompletedContainersStatuses());
+ Thread.sleep(100);
+ }
+
+ // Register for the second time
+ RegisterApplicationMasterResponse response = null;
+ try {
+ response = am.registerAppAttempt(false);
+ // When AM restart, it means nmToken in client side should be missing
+ tokenCacheClientSide.clear();
+ response.getNMTokensFromPreviousAttempts()
+ .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
+ } catch (InvalidApplicationMasterRequestException e) {
+ Assert.assertEquals(false, keepContainers);
+ return;
+ }
+ Assert.assertEquals("RM should not allow second register"
+ + " for UAM without keep container flag ", true, keepContainers);
+
+ // Expecting the zero running containers previously
+ Assert.assertEquals(0,
response.getContainersFromPreviousAttempts().size());
+ Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
+
+ // Allocate one more containers to UAM, just to be safe
+ numContainers = 1;
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ nm.nodeHeartbeat(true);
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
Review Comment:
@slfan1989
I think there are no need to close. Because no resource are allocated, just
a mock object in memory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]