slfan1989 commented on code in PR #4792:
URL: https://github.com/apache/hadoop/pull/4792#discussion_r954433018


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java:
##########
@@ -450,4 +452,52 @@ public void drainUAMHeartbeats() {
       uam.drainHeartbeatThread();
     }
   }
+
+  /**
+   * Complete FinishApplicationMaster interface calls in batches.
+   *
+   * @param request FinishApplicationMasterRequest
+   * @param appId application Id
+   * @return Returns the Map map,
+   *         the key is subClusterId, the value is 
FinishApplicationMasterResponse
+   */
+  public Map<String, FinishApplicationMasterResponse> 
batchFinishApplicationMaster(
+      FinishApplicationMasterRequest request, String appId) {
+
+    Map<String, FinishApplicationMasterResponse> responseMap = new HashMap<>();
+    Set<String> subClusterIds = this.unmanagedAppMasterMap.keySet();
+
+    if (subClusterIds != null && !subClusterIds.isEmpty()) {
+      ExecutorCompletionService<Map<String, FinishApplicationMasterResponse>> 
finishAppService =
+          new ExecutorCompletionService<>(this.threadpool);
+      LOG.info("Sending finish application request to {} sub-cluster RMs", 
subClusterIds.size());
+
+      for (final String subClusterId : subClusterIds) {
+        finishAppService.submit(() -> {
+          LOG.info("Sending finish application request to RM {}", 
subClusterId);
+          FinishApplicationMasterResponse uamResponse = null;
+          try {
+            uamResponse = finishApplicationMaster(subClusterId, request);

Review Comment:
   Thanks for your suggestion, the code looks very good, I will modify it.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -969,4 +969,58 @@ private PreemptionMessage createDummyPreemptionMessage(
     preemptionMessage.setContract(contract);
     return preemptionMessage;
   }
+
+  @Test
+  public void testBatchFinishApplicationMaster() throws IOException, 
InterruptedException {
+
+    final RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    UserGroupInformation ugi = 
interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Register the application
+      RegisterApplicationMasterRequest registerReq1 =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq1.setHost(Integer.toString(testAppId));
+      registerReq1.setRpcPort(0);
+      registerReq1.setTrackingUrl("");
+
+      // Register ApplicationMaster
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq1);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 and sc2 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+      registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+      int numberOfContainers = 3;
+      List<Container> containers =
+          getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+      Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+      Assert.assertEquals(numberOfContainers * 2, containers.size());
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finshResponse =

Review Comment:
   I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to