goiri commented on code in PR #4543:
URL: https://github.com/apache/hadoop/pull/4543#discussion_r922581238
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1366,4 +1386,49 @@ public void shutdown() {
threadpool.shutdown();
}
}
+
+ <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo>
clusterIds,
+ ClientMethod request, Class<R> clazz) throws YarnException, IOException
{
+ ArrayList<SubClusterInfo> clusterIdList = new ArrayList<>(clusterIds);
+ return invokeConcurrent(clusterIdList, request, clazz);
+ }
+
+ private <R> Map<SubClusterInfo, R>
invokeConcurrent(ArrayList<SubClusterInfo> subClusterInfo,
+ ClientMethod request, Class<R> clazz) {
+
+ Map<SubClusterInfo, R> results = new HashMap<>();
+
+ // Send the requests in parallel
+ CompletionService<R> compSvc = new
ExecutorCompletionService<>(this.threadpool);
+
+ for (final SubClusterInfo info : subClusterInfo) {
Review Comment:
You could pass directly:
```
Collection<SubClusterInfo> clusterIds
```
No need to make it an array list.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1366,4 +1386,49 @@ public void shutdown() {
threadpool.shutdown();
}
}
+
+ <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo>
clusterIds,
+ ClientMethod request, Class<R> clazz) throws YarnException, IOException
{
+ ArrayList<SubClusterInfo> clusterIdList = new ArrayList<>(clusterIds);
+ return invokeConcurrent(clusterIdList, request, clazz);
+ }
+
+ private <R> Map<SubClusterInfo, R>
invokeConcurrent(ArrayList<SubClusterInfo> subClusterInfo,
+ ClientMethod request, Class<R> clazz) {
+
+ Map<SubClusterInfo, R> results = new HashMap<>();
+
+ // Send the requests in parallel
+ CompletionService<R> compSvc = new
ExecutorCompletionService<>(this.threadpool);
+
+ for (final SubClusterInfo info : subClusterInfo) {
+ compSvc.submit(() -> {
+ DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(
+ info.getSubClusterId(), info.getRMWebServiceAddress());
+ try {
+ Method method = DefaultRequestInterceptorREST.class.
+ getMethod(request.getMethodName(), request.getTypes());
+ return clazz.cast(method.invoke(interceptor, request.getParams()));
Review Comment:
Extract to make it clear what type is each thing.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -560,4 +563,49 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
+ new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Submit the application we want the report later
+ Response response = interceptor.submitApplication(context, null);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ ApplicationAttemptId appAttempt =
+ ApplicationAttemptId.newInstance(appId, 1);
+
+ ContainersInfo responseGet = interceptor.getContainers(null, null,
+ appId.toString(), appAttempt.toString());
+
+ Assert.assertEquals(4, responseGet.getContainers().size());
+ }
+
+ @Test
+ public void testGetContainersNotExists() {
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ContainersInfo response =
+ interceptor.getContainers(null, null, appId.toString(), null);
+ Assert.assertTrue(response.getContainers().isEmpty());
+ }
+
+ @Test
+ public void testGetContainersWrongFormat() {
+ ContainersInfo response =
Review Comment:
Fits in one line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -560,4 +563,49 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
+ new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Submit the application we want the report later
+ Response response = interceptor.submitApplication(context, null);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ ApplicationAttemptId appAttempt =
+ ApplicationAttemptId.newInstance(appId, 1);
+
+ ContainersInfo responseGet = interceptor.getContainers(null, null,
+ appId.toString(), appAttempt.toString());
+
+ Assert.assertEquals(4, responseGet.getContainers().size());
+ }
+
+ @Test
+ public void testGetContainersNotExists() {
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ContainersInfo response =
Review Comment:
One line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -19,15 +19,10 @@
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
Review Comment:
Leave expanded.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java:
##########
@@ -46,4 +47,7 @@ public ArrayList<ContainerInfo> getContainers() {
return container;
}
+ public void addAll(Collection<ContainerInfo> containersInfo) {
+ container.addAll(new ArrayList<>(containersInfo));
Review Comment:
You don't need to create the ArrayList.
```
public void addAll(Collection<ContainerInfo> containersInfo) {
container.addAll(containersInfo);
}
```
Should just work:
https://docs.oracle.com/javase/7/docs/api/java/util/ArrayList.html#addAll(java.util.Collection)
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1336,7 +1331,32 @@ public AppAttemptInfo getAppAttempt(HttpServletRequest
req,
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
- throw new NotImplementedException("Code is not implemented");
+
+ ContainersInfo containersInfo = new ContainersInfo();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive;
+ try {
+ subClustersActive = getActiveSubclusters();
+ } catch (NotFoundException e) {
+ LOG.error("Get all active sub cluster(s) error.", e);
+ return containersInfo;
+ }
+
+ try {
+ ClientMethod remoteMethod = new ClientMethod("getContainers",
Review Comment:
The code would look more intuitive as:
```
Class[] argsClasses = new Class[]{
HttpServletRequest.class, HttpServletResponse.class, String.class,
String.class};
Object[] args = new Object[]{req, res, appId, appAttemptId}
ClientMethod remoteMethod = new ClientMethod("getContainers",
argsClasses, args);
```
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -560,4 +563,49 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
Review Comment:
Fits in 100 chars
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1366,4 +1386,49 @@ public void shutdown() {
threadpool.shutdown();
}
}
+
+ <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo>
clusterIds,
+ ClientMethod request, Class<R> clazz) throws YarnException, IOException
{
+ ArrayList<SubClusterInfo> clusterIdList = new ArrayList<>(clusterIds);
+ return invokeConcurrent(clusterIdList, request, clazz);
+ }
+
+ private <R> Map<SubClusterInfo, R>
invokeConcurrent(ArrayList<SubClusterInfo> subClusterInfo,
+ ClientMethod request, Class<R> clazz) {
+
+ Map<SubClusterInfo, R> results = new HashMap<>();
+
+ // Send the requests in parallel
+ CompletionService<R> compSvc = new
ExecutorCompletionService<>(this.threadpool);
+
+ for (final SubClusterInfo info : subClusterInfo) {
+ compSvc.submit(() -> {
+ DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(
+ info.getSubClusterId(), info.getRMWebServiceAddress());
+ try {
+ Method method = DefaultRequestInterceptorREST.class.
+ getMethod(request.getMethodName(), request.getTypes());
+ return clazz.cast(method.invoke(interceptor, request.getParams()));
+ } catch (Exception e) {
+ LOG.error("SubCluster {} failed to Call {} Method}.",
info.getSubClusterId(),
+ request.getMethodName(), e);
+ return null;
+ }
+ });
+ }
+
+ for (int i = 0; i < subClusterInfo.size(); i++) {
Review Comment:
This would be cleaner as:
```
for (final subClusterInfo : subClusterInfos) {
```
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -560,4 +563,49 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
+ new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Submit the application we want the report later
+ Response response = interceptor.submitApplication(context, null);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ ApplicationAttemptId appAttempt =
Review Comment:
One line
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -560,4 +563,49 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
+ new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Submit the application we want the report later
+ Response response = interceptor.submitApplication(context, null);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ ApplicationAttemptId appAttempt =
+ ApplicationAttemptId.newInstance(appId, 1);
+
+ ContainersInfo responseGet = interceptor.getContainers(null, null,
Review Comment:
Move all the args to the second line:
```
ContainersInfo responseGet = interceptor.getContainers(
null, null, appId.toString(), appAttempt.toString());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]