goiri commented on code in PR #4904:
URL: https://github.com/apache/hadoop/pull/4904#discussion_r975598672
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -773,24 +761,19 @@ public GetClusterNodesResponse
getClusterNodes(GetClusterNodesRequest request)
RouterServerUtil.logAndThrowException("Missing getClusterNodes
request.", null);
}
long startTime = clock.getTime();
- Map<SubClusterId, SubClusterInfo> subClusters =
- federationFacade.getSubClusters(true);
- Map<SubClusterId, GetClusterNodesResponse> clusterNodes =
Maps.newHashMap();
- for (SubClusterId subClusterId : subClusters.keySet()) {
- ApplicationClientProtocol client;
- try {
- client = getClientRMProxyForSubCluster(subClusterId);
- GetClusterNodesResponse response = client.getClusterNodes(request);
- clusterNodes.put(subClusterId, response);
- } catch (Exception ex) {
- routerMetrics.incrClusterNodesFailedRetrieved();
- RouterServerUtil.logAndThrowException("Unable to get cluster nodes due
to exception.", ex);
- }
+ ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
+ new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
+ Collection<GetClusterNodesResponse> clusterNodes = null;
+ try {
+ clusterNodes = invokeConcurrent(remoteMethod,
GetClusterNodesResponse.class);
Review Comment:
Can we define and return inside the try?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
}
/**
+ * Get Active's SubClusterIds{@link SubClusterId}.
+ *
+ * @return SubClusterId Collection.
+ * @throws YarnException if the call to get active subClusterIds is
unsuccessful
+ */
+ public Collection<SubClusterId> getActiveSubClusterIds() throws
YarnException {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
+ getSubClusters(true);
Review Comment:
Can we put the true in a variable to make it easier to understand what the
true represents?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
return RouterYarnClientUtils.merge(clusterMetrics);
}
- <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
- ClientMethod request, Class<R> clazz) throws YarnException, IOException {
- List<Callable<Object>> callables = new ArrayList<>();
- List<Future<Object>> futures = new ArrayList<>();
- Map<SubClusterId, IOException> exceptions = new TreeMap<>();
- for (SubClusterId subClusterId : clusterIds) {
- callables.add(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ApplicationClientProtocol protocol =
- getClientRMProxyForSubCluster(subClusterId);
- Method method = ApplicationClientProtocol.class
- .getMethod(request.getMethodName(), request.getTypes());
- return method.invoke(protocol, request.getParams());
- }
+ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+ throws YarnException {
+
+ Collection<SubClusterId> subClusterIds =
federationFacade.getActiveSubClusterIds();
+
+ List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+ List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+ Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+ // Generate parallel Callable tasks
+ for (SubClusterId subClusterId : subClusterIds) {
+ callables.add(() -> {
+ ApplicationClientProtocol protocol =
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
}
/**
+ * Get Active's SubClusterIds{@link SubClusterId}.
+ *
+ * @return SubClusterId Collection.
+ * @throws YarnException if the call to get active subClusterIds is
unsuccessful
+ */
+ public Collection<SubClusterId> getActiveSubClusterIds() throws
YarnException {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
return RouterYarnClientUtils.merge(clusterMetrics);
}
- <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
- ClientMethod request, Class<R> clazz) throws YarnException, IOException {
- List<Callable<Object>> callables = new ArrayList<>();
- List<Future<Object>> futures = new ArrayList<>();
- Map<SubClusterId, IOException> exceptions = new TreeMap<>();
- for (SubClusterId subClusterId : clusterIds) {
- callables.add(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ApplicationClientProtocol protocol =
- getClientRMProxyForSubCluster(subClusterId);
- Method method = ApplicationClientProtocol.class
- .getMethod(request.getMethodName(), request.getTypes());
- return method.invoke(protocol, request.getParams());
- }
+ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+ throws YarnException {
+
+ Collection<SubClusterId> subClusterIds =
federationFacade.getActiveSubClusterIds();
+
+ List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+ List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+ Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+ // Generate parallel Callable tasks
+ for (SubClusterId subClusterId : subClusterIds) {
+ callables.add(() -> {
+ ApplicationClientProtocol protocol =
+ getClientRMProxyForSubCluster(subClusterId);
+ Method method = ApplicationClientProtocol.class
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception
{
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
- invokeConcurrent(new ArrayList<>(), remoteMethod,
GetClusterMetricsResponse.class);
- Assert.assertTrue(clusterMetrics.isEmpty());
+ Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+ invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+ Assert.assertTrue(!clusterMetrics.isEmpty());
Review Comment:
Now the expectation is the opposite? What has changed?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception
{
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
- invokeConcurrent(new ArrayList<>(), remoteMethod,
GetClusterMetricsResponse.class);
- Assert.assertTrue(clusterMetrics.isEmpty());
+ Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+ invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+ Assert.assertTrue(!clusterMetrics.isEmpty());
Review Comment:
assertFalse
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]