This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new eb1d3ebe2fb YARN-11442. Refactor FederationInterceptorREST Code. (#5420) eb1d3ebe2fb is described below commit eb1d3ebe2fb12eb36f507b2fceaea724c0f863d9 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Sat Apr 1 06:29:18 2023 +0800 YARN-11442. Refactor FederationInterceptorREST Code. (#5420) --- .../utils/FederationStateStoreFacade.java | 21 + .../router/webapp/FederationInterceptorREST.java | 683 +++++++-------------- .../server/router/webapp/RouterWebServiceUtil.java | 7 +- .../webapp/TestFederationInterceptorREST.java | 99 ++- 4 files changed, 303 insertions(+), 507 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index ebad527b6d4..e7cfb2e3112 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.Random; +import java.util.Collection; import javax.cache.Cache; import javax.cache.CacheManager; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.webapp.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1211,4 +1213,23 @@ public final class FederationStateStoreFacade { } return false; } + + /** + * Get active subclusters. + * + * @return We will return a list of active subclusters as a Collection. + */ + public Collection<SubClusterInfo> getActiveSubClusters() + throws NotFoundException { + try { + Map<SubClusterId, SubClusterInfo> subClusterMap = getSubClusters(true); + if (MapUtils.isEmpty(subClusterMap)) { + throw new NotFoundException("Not Found SubClusters."); + } + return subClusterMap.values(); + } catch (Exception e) { + LOG.error("getActiveSubClusters failed.", e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 9975823ec2b..5d73ef20e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -29,12 +29,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequestWrapper; @@ -45,6 +46,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.io.Text; @@ -148,6 +150,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade.getRandomActiveSubCluster; import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken; import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation; @@ -159,8 +162,7 @@ import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.g */ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { - private static final Logger LOG = - LoggerFactory.getLogger(FederationInterceptorREST.class); + private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptorREST.class); private int numSubmitRetries; private FederationStateStoreFacade federationFacade; @@ -205,10 +207,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { interceptors = new HashMap<>(); routerMetrics = RouterMetrics.getMetrics(); - threadpool = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("FederationInterceptorREST #%d") - .build()); + threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat("FederationInterceptorREST #%d") + .build()); returnPartialReport = conf.getBoolean( YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, @@ -235,13 +236,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } @VisibleForTesting - protected DefaultRequestInterceptorREST getInterceptorForSubCluster( - SubClusterId subClusterId) { + protected DefaultRequestInterceptorREST getInterceptorForSubCluster(SubClusterId subClusterId) { if (interceptors.containsKey(subClusterId)) { return interceptors.get(subClusterId); } else { - LOG.error( - "The interceptor for SubCluster {} does not exist in the cache.", + LOG.error("The interceptor for SubCluster {} does not exist in the cache.", subClusterId); return null; } @@ -255,44 +254,63 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String interceptorClassName = conf.get( YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); - DefaultRequestInterceptorREST interceptorInstance = null; + + DefaultRequestInterceptorREST interceptorInstance; try { Class<?> interceptorClass = conf.getClassByName(interceptorClassName); - if (DefaultRequestInterceptorREST.class - .isAssignableFrom(interceptorClass)) { - interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils - .newInstance(interceptorClass, conf); + if (DefaultRequestInterceptorREST.class.isAssignableFrom(interceptorClass)) { + interceptorInstance = + (DefaultRequestInterceptorREST) ReflectionUtils.newInstance(interceptorClass, conf); String userName = getUser().getUserName(); interceptorInstance.init(userName); } else { - throw new YarnRuntimeException( - "Class: " + interceptorClassName + " not instance of " - + DefaultRequestInterceptorREST.class.getCanonicalName()); + throw new YarnRuntimeException("Class: " + interceptorClassName + " not instance of " + + DefaultRequestInterceptorREST.class.getCanonicalName()); } } catch (ClassNotFoundException e) { - throw new YarnRuntimeException( - "Could not instantiate ApplicationMasterRequestInterceptor: " - + interceptorClassName, - e); + throw new YarnRuntimeException("Could not instantiate ApplicationMasterRequestInterceptor: " + + interceptorClassName, e); } - String webAppAddresswithScheme = - WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress; - interceptorInstance.setWebAppAddress(webAppAddresswithScheme); + String webAppAddressWithScheme = WebAppUtils.getHttpSchemePrefix(conf) + webAppAddress; + interceptorInstance.setWebAppAddress(webAppAddressWithScheme); interceptorInstance.setSubClusterId(subClusterId); interceptors.put(subClusterId, interceptorInstance); return interceptorInstance; } + protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster( + SubClusterInfo subClusterInfo) { + if (subClusterInfo != null) { + final SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + final String webServiceAddress = subClusterInfo.getRMWebServiceAddress(); + return getOrCreateInterceptorForSubCluster(subClusterId, webServiceAddress); + } + return null; + } + + protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String appId) + throws YarnException { + // We first check the applicationId + RouterServerUtil.validateApplicationId(appId); + + // Get homeSubCluster By appId + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + return getOrCreateInterceptorForSubCluster(subClusterInfo); + } + + protected DefaultRequestInterceptorREST getOrCreateInterceptorByNodeId(String nodeId) { + SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId); + return getOrCreateInterceptorForSubCluster(subClusterInfo); + } + @VisibleForTesting protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster( SubClusterId subClusterId, String webAppAddress) { - DefaultRequestInterceptorREST interceptor = - getInterceptorForSubCluster(subClusterId); - String webAppAddresswithScheme = WebAppUtils.getHttpSchemePrefix( - this.getConf()) + webAppAddress; - if (interceptor == null || !webAppAddresswithScheme.equals(interceptor. - getWebAppAddress())){ + DefaultRequestInterceptorREST interceptor = getInterceptorForSubCluster(subClusterId); + String webAppAddressWithScheme = + WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress; + if (interceptor == null || !webAppAddressWithScheme.equals(interceptor.getWebAppAddress())) { interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress); } return interceptor; @@ -372,8 +390,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount) throws YarnException, IOException, InterruptedException { - SubClusterId subClusterId = - federationFacade.getRandomActiveSubCluster(subClustersActive, blackList); + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive, blackList); LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId); @@ -462,8 +479,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { * Router submits the request to the selected SubCluster (e.g. SC2). */ @Override - public Response submitApplication(ApplicationSubmissionContextInfo newApp, - HttpServletRequest hsr) + public Response submitApplication(ApplicationSubmissionContextInfo newApp, HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { long startTime = clock.getTime(); @@ -548,6 +564,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Step3. We get subClusterInfo based on subClusterId. SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId); + if (subClusterInfo == null) { + throw new YarnException("Can't Find SubClusterId = " + subClusterId); + } // Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED, // We return the response, otherwise we throw an exception. @@ -587,43 +606,29 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { * operation. */ @Override - public AppInfo getApp(HttpServletRequest hsr, String appId, - Set<String> unselectedFields) { + public AppInfo getApp(HttpServletRequest hsr, String appId, Set<String> unselectedFields) { - long startTime = clock.getTime(); - - ApplicationId applicationId = null; try { - applicationId = ApplicationId.fromString(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrAppsFailedRetrieved(); - return null; - } + long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = null; - SubClusterId subClusterId = null; - try { - subClusterId = - federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId == null) { + // Get SubClusterInfo according to applicationId + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); + if (interceptor == null) { routerMetrics.incrAppsFailedRetrieved(); return null; } - subClusterInfo = federationFacade.getSubCluster(subClusterId); + AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); + long stopTime = clock.getTime(); + routerMetrics.succeededAppsRetrieved(stopTime - startTime); + return response; } catch (YarnException e) { routerMetrics.incrAppsFailedRetrieved(); + LOG.error("getApp Error, applicationId = {}.", appId, e); return null; + } catch (IllegalArgumentException e) { + routerMetrics.incrAppsFailedRetrieved(); + throw e; } - - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - subClusterId, subClusterInfo.getRMWebServiceAddress()); - AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); - - long stopTime = clock.getTime(); - routerMetrics.succeededAppsRetrieved(stopTime - startTime); - - return response; } /** @@ -643,13 +648,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { * operation. */ @Override - public Response updateAppState(AppState targetState, HttpServletRequest hsr, - String appId) throws AuthorizationException, YarnException, - InterruptedException, IOException { + public Response updateAppState(AppState targetState, HttpServletRequest hsr, String appId) + throws AuthorizationException, YarnException, InterruptedException, IOException { long startTime = clock.getTime(); - ApplicationId applicationId = null; + ApplicationId applicationId; try { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { @@ -660,8 +664,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { .build(); } - SubClusterInfo subClusterInfo = null; - SubClusterId subClusterId = null; + SubClusterInfo subClusterInfo; + SubClusterId subClusterId; try { subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); @@ -724,60 +728,35 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { AppsInfo apps = new AppsInfo(); long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = null; - try { - subClustersActive = federationFacade.getSubClusters(true); - } catch (YarnException e) { - routerMetrics.incrMultipleAppsFailedRetrieved(); - return null; - } - - // Send the requests in parallel - CompletionService<AppsInfo> compSvc = - new ExecutorCompletionService<>(this.threadpool); - // HttpServletRequest does not work with ExecutorCompletionService. // Create a duplicate hsr. final HttpServletRequest hsrCopy = clone(hsr); - for (final SubClusterInfo info : subClustersActive.values()) { - compSvc.submit(new Callable<AppsInfo>() { - @Override - public AppsInfo call() { - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - info.getSubClusterId(), info.getRMWebServiceAddress()); - AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, - statesQuery, finalStatusQuery, userQuery, queueQuery, count, - startedBegin, startedEnd, finishBegin, finishEnd, - applicationTypes, applicationTags, name, unselectedFields); - - if (rmApps == null) { - routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster {} failed to return appReport.", info.getSubClusterId()); - return null; - } - return rmApps; - } - }); - } + Collection<SubClusterInfo> subClusterInfos = federationFacade.getActiveSubClusters(); - // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.size(); i++) { + List<AppsInfo> appsInfos = subClusterInfos.parallelStream().map(subCluster -> { try { - Future<AppsInfo> future = compSvc.take(); - AppsInfo appsResponse = future.get(); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subCluster); + AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, statesQuery, finalStatusQuery, + userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd, + applicationTypes, applicationTags, name, unselectedFields); + if (rmApps != null) { + return rmApps; + } + } catch (Exception e) { + LOG.warn("Failed to get application report.", e); + } + routerMetrics.incrMultipleAppsFailedRetrieved(); + LOG.error("Subcluster {} failed to return appReport.", subCluster.getSubClusterId()); + return null; + }).collect(Collectors.toList()); + appsInfos.forEach(appsInfo -> { + if (appsInfo != null) { + apps.addAll(appsInfo.getApps()); long stopTime = clock.getTime(); routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); - - if (appsResponse != null) { - apps.addAll(appsResponse.getApps()); - } - } catch (Throwable e) { - routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.warn("Failed to get application report", e); } - } + }); if (apps.getApps().isEmpty()) { return null; @@ -803,15 +782,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (hsr == null) { return null; } - @SuppressWarnings("unchecked") - final Map<String, String[]> parameterMap = - (Map<String, String[]>) hsr.getParameterMap(); + + final Map<String, String[]> parameterMap = hsr.getParameterMap(); final String pathInfo = hsr.getPathInfo(); final String user = hsr.getRemoteUser(); final Principal principal = hsr.getUserPrincipal(); - final String mediaType = - RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( - hsr, AppsInfo.class); + final String mediaType = RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( + hsr, AppsInfo.class); return new HttpServletRequestWrapper(hsr) { public Map<String, String[]> getParameterMap() { return parameterMap; @@ -835,20 +812,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { }; } - /** - * Get the active subclusters in the federation. - * @return Map from subcluster id to its info. - * @throws NotFoundException If the subclusters cannot be found. - */ - private Map<SubClusterId, SubClusterInfo> getActiveSubclusters() - throws NotFoundException { - try { - return federationFacade.getSubClusters(true); - } catch (YarnException e) { - throw new NotFoundException(e.getMessage()); - } - } - /** * Get the active subcluster in the federation. * @@ -860,13 +823,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throws NotFoundException { try { SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId); - Map<SubClusterId, SubClusterInfo> subClusterInfoMap = - federationFacade.getSubClusters(true); - SubClusterInfo subClusterInfo = subClusterInfoMap.get(pSubClusterId); - if (subClusterInfo == null) { - throw new NotFoundException(subClusterId + " not found."); - } - return subClusterInfo; + return federationFacade.getSubCluster(pSubClusterId); } catch (YarnException e) { throw new NotFoundException(e.getMessage()); } @@ -890,14 +847,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { */ @Override public NodeInfo getNode(String nodeId) { - final Map<SubClusterId, SubClusterInfo> subClustersActive = - getActiveSubclusters(); + + final Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); + if (subClustersActive.isEmpty()) { - throw new NotFoundException( - FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); + throw new NotFoundException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } - final Map<SubClusterInfo, NodeInfo> results = - getNode(subClustersActive.values(), nodeId); + + final Map<SubClusterInfo, NodeInfo> results = getNode(subClustersActive, nodeId); // Collect the responses NodeInfo nodeInfo = null; @@ -922,65 +879,53 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { /** * Get a node and the subcluster where it is. + * * @param subClusters Subclusters where to search. - * @param nodeId Identifier of the node we are looking for. + * @param nodeId Identifier of the node we are looking for. * @return Map between subcluster and node. */ - private Map<SubClusterInfo, NodeInfo> getNode( - Collection<SubClusterInfo> subClusters, String nodeId) { - - // Send the requests in parallel - CompletionService<NodeInfo> compSvc = - new ExecutorCompletionService<NodeInfo>(this.threadpool); - final Map<SubClusterInfo, Future<NodeInfo>> futures = new HashMap<>(); - for (final SubClusterInfo subcluster : subClusters) { - final SubClusterId subclusterId = subcluster.getSubClusterId(); - Future<NodeInfo> result = compSvc.submit(() -> { - try { - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - subclusterId, subcluster.getRMWebServiceAddress()); - return interceptor.getNode(nodeId); - } catch (Exception e) { - LOG.error("Subcluster {} failed to return nodeInfo.", subclusterId, e); - return null; - } - }); - futures.put(subcluster, result); - } + private Map<SubClusterInfo, NodeInfo> getNode(Collection<SubClusterInfo> subClusters, + String nodeId) { + + // Parallel traversal of subClusters + Stream<Pair<SubClusterInfo, NodeInfo>> pairStream = subClusters.parallelStream().map( + subClusterInfo -> { + final SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + try { + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster(subClusterInfo); + return Pair.of(subClusterInfo, interceptor.getNode(nodeId)); + } catch (Exception e) { + LOG.error("Subcluster {} failed to return nodeInfo.", subClusterId, e); + return null; + } + }); // Collect the results final Map<SubClusterInfo, NodeInfo> results = new HashMap<>(); - for (Entry<SubClusterInfo, Future<NodeInfo>> entry : futures.entrySet()) { - try { - final Future<NodeInfo> future = entry.getValue(); - final NodeInfo nodeInfo = future.get(); - // Check if the node was found in this SubCluster - if (nodeInfo != null) { - SubClusterInfo subcluster = entry.getKey(); - results.put(subcluster, nodeInfo); - } - } catch (Throwable e) { - LOG.warn("Failed to get node report ", e); + pairStream.forEach(pair -> { + if (pair != null) { + SubClusterInfo subCluster = pair.getKey(); + NodeInfo nodeInfo = pair.getValue(); + results.put(subCluster, nodeInfo); } - } + }); return results; } /** * Get the subcluster a node belongs to. + * * @param nodeId Identifier of the node we are looking for. * @return The subcluster containing the node. * @throws NotFoundException If the node cannot be found. */ - private SubClusterInfo getNodeSubcluster(String nodeId) - throws NotFoundException { + private SubClusterInfo getNodeSubcluster(String nodeId) throws NotFoundException { + + final Collection<SubClusterInfo> subClusters = federationFacade.getActiveSubClusters(); + final Map<SubClusterInfo, NodeInfo> results = getNode(subClusters, nodeId); - final Collection<SubClusterInfo> subClusters = - getActiveSubclusters().values(); - final Map<SubClusterInfo, NodeInfo> results = - getNode(subClusters, nodeId); SubClusterInfo subcluster = null; NodeInfo nodeInfo = null; for (Entry<SubClusterInfo, NodeInfo> entry : results.entrySet()) { @@ -992,8 +937,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } } if (subcluster == null) { - throw new NotFoundException( - "Cannot find " + nodeId + " in any subcluster"); + throw new NotFoundException("Cannot find " + nodeId + " in any subcluster"); } return subcluster; } @@ -1022,15 +966,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { NodesInfo nodes = new NodesInfo(); try { - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{String.class}; Object[] args = new Object[]{states}; ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args); Map<SubClusterInfo, NodesInfo> nodesMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class); - nodesMap.values().stream().forEach(nodesInfo -> { - nodes.addAll(nodesInfo.getNodes()); - }); + invokeConcurrent(subClustersActive, remoteMethod, NodesInfo.class); + nodesMap.values().forEach(nodesInfo -> nodes.addAll(nodesInfo.getNodes())); } catch (NotFoundException e) { LOG.error("get all active sub cluster(s) error.", e); throw e; @@ -1049,14 +991,20 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes()); } + /** + * This method changes the resources of a specific node, and it is reachable + * by using {@link RMWSConsts#NODE_RESOURCE}. + * + * @param hsr The servlet request. + * @param nodeId The node we want to retrieve the information for. + * It is a PathParam. + * @param resourceOption The resource change. + * @return the resources of a specific node. + */ @Override public ResourceInfo updateNodeResource(HttpServletRequest hsr, String nodeId, ResourceOptionInfo resourceOption) { - SubClusterInfo subcluster = getNodeSubcluster(nodeId); - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - subcluster.getSubClusterId(), - subcluster.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId); return interceptor.updateNodeResource(hsr, nodeId, resourceOption); } @@ -1064,50 +1012,30 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public ClusterMetricsInfo getClusterMetricsInfo() { ClusterMetricsInfo metrics = new ClusterMetricsInfo(); - final Map<SubClusterId, SubClusterInfo> subClustersActive; - try { - subClustersActive = getActiveSubclusters(); - } catch (Exception e) { - LOG.error(e.getLocalizedMessage()); - return metrics; - } - - // Send the requests in parallel - CompletionService<ClusterMetricsInfo> compSvc = - new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool); + Collection<SubClusterInfo> subClusterInfos = federationFacade.getActiveSubClusters(); - for (final SubClusterInfo info : subClustersActive.values()) { - compSvc.submit(new Callable<ClusterMetricsInfo>() { - @Override - public ClusterMetricsInfo call() { + Stream<ClusterMetricsInfo> clusterMetricsInfoStream = subClusterInfos.parallelStream() + .map(subClusterInfo -> { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - info.getSubClusterId(), info.getRMWebServiceAddress()); + getOrCreateInterceptorForSubCluster(subClusterInfo); try { - ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); - return metrics; + return interceptor.getClusterMetricsInfo(); } catch (Exception e) { LOG.error("Subcluster {} failed to return Cluster Metrics.", - info.getSubClusterId()); + subClusterInfo.getSubClusterId()); return null; } - } - }); - } + }); - // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.size(); i++) { + clusterMetricsInfoStream.forEach(clusterMetricsInfo -> { try { - Future<ClusterMetricsInfo> future = compSvc.take(); - ClusterMetricsInfo metricsResponse = future.get(); - - if (metricsResponse != null) { - RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse); + if (clusterMetricsInfo != null) { + RouterWebServiceUtil.mergeMetrics(metrics, clusterMetricsInfo); } } catch (Throwable e) { - LOG.warn("Failed to get nodes report ", e); + LOG.warn("Failed to get nodes report.", e); } - } + }); return metrics; } @@ -1131,31 +1059,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public AppState getAppState(HttpServletRequest hsr, String appId) throws AuthorizationException { - - ApplicationId applicationId = null; try { - applicationId = ApplicationId.fromString(appId); - } catch (IllegalArgumentException e) { - return null; - } - - SubClusterInfo subClusterInfo = null; - SubClusterId subClusterId = null; - try { - subClusterId = - federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId == null) { - return null; + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); + if (interceptor != null) { + return interceptor.getAppState(hsr, appId); } - subClusterInfo = federationFacade.getSubCluster(subClusterId); - } catch (YarnException e) { - return null; + } catch (YarnException | IllegalArgumentException e) { + LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e); } - - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()); - return interceptor.getAppState(hsr, appId); + return null; } @Override @@ -1176,12 +1088,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public ClusterInfo getClusterInfo() { try { long startTime = Time.now(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{}; Object[] args = new Object[]{}; ClientMethod remoteMethod = new ClientMethod("getClusterInfo", argsClasses, args); Map<SubClusterInfo, ClusterInfo> subClusterInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, ClusterInfo.class); FederationClusterInfo federationClusterInfo = new FederationClusterInfo(); subClusterInfoMap.forEach((subClusterInfo, clusterInfo) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); @@ -1216,13 +1128,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) { try { long startTime = Time.now(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class}; Object[] args = new Object[]{hsrCopy}; ClientMethod remoteMethod = new ClientMethod("getClusterUserInfo", argsClasses, args); Map<SubClusterInfo, ClusterUserInfo> subClusterInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterUserInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, ClusterUserInfo.class); FederationClusterUserInfo federationClusterUserInfo = new FederationClusterUserInfo(); subClusterInfoMap.forEach((subClusterInfo, clusterUserInfo) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); @@ -1246,7 +1158,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { /** * This method retrieves the current scheduler status, and it is reachable by * using {@link RMWSConsts#SCHEDULER}. - * * For the federation mode, the SchedulerType information of the cluster * cannot be integrated and displayed, and the specific cluster information needs to be marked. * @@ -1256,12 +1167,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public SchedulerTypeInfo getSchedulerInfo() { try { long startTime = Time.now(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{}; Object[] args = new Object[]{}; ClientMethod remoteMethod = new ClientMethod("getSchedulerInfo", argsClasses, args); Map<SubClusterInfo, SchedulerTypeInfo> subClusterInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, SchedulerTypeInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, SchedulerTypeInfo.class); FederationSchedulerTypeInfo federationSchedulerTypeInfo = new FederationSchedulerTypeInfo(); subClusterInfoMap.forEach((subClusterInfo, schedulerTypeInfo) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); @@ -1319,17 +1230,18 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Step2. Call dumpSchedulerLogs of each subcluster. try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{String.class, HttpServletRequest.class}; Object[] args = new Object[]{time, hsrCopy}; ClientMethod remoteMethod = new ClientMethod("dumpSchedulerLogs", argsClasses, args); Map<SubClusterInfo, String> dumpSchedulerLogsMap = invokeConcurrent( - subClustersActive.values(), remoteMethod, String.class); + subClustersActive, remoteMethod, String.class); StringBuilder stringBuilder = new StringBuilder(); dumpSchedulerLogsMap.forEach((subClusterInfo, msg) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); - stringBuilder.append("subClusterId" + subClusterId + " : " + msg + "; "); + stringBuilder.append("subClusterId") + .append(subClusterId).append(" : ").append(msg).append("; "); }); long stopTime = clock.getTime(); routerMetrics.succeededDumpSchedulerLogsRetrieved(stopTime - startTime); @@ -1369,12 +1281,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Query SubClusterInfo according to id, // if the nodeId cannot get SubClusterInfo, an exception will be thrown directly. - SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId); - // Call the corresponding subCluster to get ActivitiesInfo. long startTime = clock.getTime(); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId); final HttpServletRequest hsrCopy = clone(hsr); ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy); if (activitiesInfo != null) { @@ -1382,10 +1291,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime); return activitiesInfo; } - } catch (IllegalArgumentException e) { - routerMetrics.incrGetActivitiesFailedRetrieved(); - throw e; - } catch (NotFoundException e) { + } catch (IllegalArgumentException | NotFoundException e) { routerMetrics.incrGetActivitiesFailedRetrieved(); throw e; } @@ -1413,13 +1319,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { Validate.checkNotNegative(activitiesCount, "activitiesCount"); // Step2. Call the interface of subCluster concurrently and get the returned result. - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class}; Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount}; ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args); Map<SubClusterInfo, BulkActivitiesInfo> appStatisticsMap = invokeConcurrent( - subClustersActive.values(), remoteMethod, BulkActivitiesInfo.class); + subClustersActive, remoteMethod, BulkActivitiesInfo.class); // Step3. Generate Federation objects and set subCluster information. long startTime = clock.getTime(); @@ -1460,22 +1366,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions, boolean summarize) { - // Only verify the app_id, - // because the specific subCluster needs to be found according to the app_id, - // and other verifications are directly handed over to the corresponding subCluster RM - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrGetAppActivitiesFailedRetrieved(); - throw e; - } - try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); final HttpServletRequest hsrCopy = clone(hsr); AppActivitiesInfo appActivitiesInfo = interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities, allocationRequestIds, groupBy, limit, actions, summarize); @@ -1502,13 +1395,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { Set<String> stateQueries, Set<String> typeQueries) { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class}; Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries}; ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args); Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent( - subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class); + subClustersActive, remoteMethod, ApplicationStatisticsInfo.class); ApplicationStatisticsInfo applicationStatisticsInfo = RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values()); if (applicationStatisticsInfo != null) { @@ -1541,13 +1434,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throws IOException { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class}; Object[] args = new Object[]{hsrCopy}; ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", argsClasses, args); Map<SubClusterInfo, NodeToLabelsInfo> nodeToLabelsInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodeToLabelsInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, NodeToLabelsInfo.class); NodeToLabelsInfo nodeToLabelsInfo = RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap); if (nodeToLabelsInfo != null) { @@ -1570,13 +1463,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public NodeLabelsInfo getRMNodeLabels(HttpServletRequest hsr) throws IOException { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class}; Object[] args = new Object[]{hsrCopy}; ClientMethod remoteMethod = new ClientMethod("getRMNodeLabels", argsClasses, args); Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class); NodeLabelsInfo nodeToLabelsInfo = RouterWebServiceUtil.mergeNodeLabelsInfo(nodeToLabelsInfoMap); if (nodeToLabelsInfo != null) { @@ -1600,12 +1493,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throws IOException { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{Set.class}; Object[] args = new Object[]{labels}; ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", argsClasses, args); Map<SubClusterInfo, LabelsToNodesInfo> labelsToNodesInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, LabelsToNodesInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, LabelsToNodesInfo.class); Map<NodeLabelInfo, NodeIDsInfo> labelToNodesMap = new HashMap<>(); labelsToNodesInfoMap.values().forEach(labelsToNode -> { Map<NodeLabelInfo, NodeIDsInfo> values = labelsToNode.getLabelsToNodes(); @@ -1666,7 +1559,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Step2. We map the NodeId and NodeToLabelsEntry in the request. Map<String, NodeToLabelsEntry> nodeIdToLabels = new HashMap<>(); - newNodeToLabels.getNodeToLabels().stream().forEach(nodeIdToLabel -> { + newNodeToLabels.getNodeToLabels().forEach(nodeIdToLabel -> { String nodeId = nodeIdToLabel.getNodeId(); nodeIdToLabels.put(nodeId, nodeIdToLabel); }); @@ -1686,11 +1579,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { long startTime = clock.getTime(); final HttpServletRequest hsrCopy = clone(hsr); StringBuilder builder = new StringBuilder(); - subClusterToNodeToLabelsEntryList.forEach((subCluster, nodeToLabelsEntryList) -> { - SubClusterId subClusterId = subCluster.getSubClusterId(); + subClusterToNodeToLabelsEntryList.forEach((subClusterInfo, nodeToLabelsEntryList) -> { + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); try { DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subCluster.getSubClusterId(), subCluster.getRMWebServiceAddress()); + subClusterInfo); interceptor.replaceLabelsOnNodes(nodeToLabelsEntryList, hsrCopy); builder.append("subCluster-").append(subClusterId.getId()).append(":Success,"); } catch (Exception e) { @@ -1703,9 +1596,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Step5. return call result. return Response.status(Status.OK).entity(builder.toString()).build(); - } catch (NotFoundException e) { - routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved(); - throw e; } catch (Exception e) { routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved(); throw e; @@ -1743,8 +1633,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // and then call the replaceLabelsOnNode of the subCluster. long startTime = clock.getTime(); SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByNodeId(nodeId); final HttpServletRequest hsrCopy = clone(hsr); interceptor.replaceLabelsOnNode(newNodeLabelsName, hsrCopy, nodeId); @@ -1753,10 +1642,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.succeededReplaceLabelsOnNodeRetrieved(stopTime - startTime); String msg = "subCluster#" + subClusterInfo.getSubClusterId().getId() + ":Success;"; return Response.status(Status.OK).entity(msg).build(); - } catch (NotFoundException e) { - routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved(); - throw e; - } catch (Exception e){ + } catch (Exception e) { routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved(); throw e; } @@ -1767,13 +1653,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throws IOException { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class}; Object[] args = new Object[]{hsrCopy}; ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", argsClasses, args); Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class); Set<NodeLabel> hashSets = Sets.newHashSet(); nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels())); NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets); @@ -1820,18 +1706,17 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActives = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{NodeLabelsInfo.class, HttpServletRequest.class}; Object[] args = new Object[]{newNodeLabels, hsrCopy}; ClientMethod remoteMethod = new ClientMethod("addToClusterNodeLabels", argsClasses, args); Map<SubClusterInfo, Response> responseInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + invokeConcurrent(subClustersActives, remoteMethod, Response.class); StringBuffer buffer = new StringBuffer(); // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS - responseInfoMap.forEach((subClusterInfo, response) -> { - buildAppendMsg(subClusterInfo, buffer, response); - }); + responseInfoMap.forEach((subClusterInfo, response) -> + buildAppendMsg(subClusterInfo, buffer, response)); long stopTime = clock.getTime(); routerMetrics.succeededAddToClusterNodeLabelsRetrieved((stopTime - startTime)); return Response.status(Status.OK).entity(buffer.toString()).build(); @@ -1868,19 +1753,18 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActives = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{Set.class, HttpServletRequest.class}; Object[] args = new Object[]{oldNodeLabels, hsrCopy}; ClientMethod remoteMethod = new ClientMethod("removeFromClusterNodeLabels", argsClasses, args); Map<SubClusterInfo, Response> responseInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + invokeConcurrent(subClustersActives, remoteMethod, Response.class); StringBuffer buffer = new StringBuffer(); // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS - responseInfoMap.forEach((subClusterInfo, response) -> { - buildAppendMsg(subClusterInfo, buffer, response); - }); + responseInfoMap.forEach((subClusterInfo, response) -> + buildAppendMsg(subClusterInfo, buffer, response)); long stopTime = clock.getTime(); routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime); return Response.status(Status.OK).entity(buffer.toString()).build(); @@ -1897,7 +1781,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } /** - * Bbulid Append information. + * Build Append information. * * @param subClusterInfo subCluster information. * @param buffer StringBuffer. @@ -1920,13 +1804,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throws IOException { try { long startTime = clock.getTime(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class}; Object[] args = new Object[]{hsrCopy, nodeId}; ClientMethod remoteMethod = new ClientMethod("getLabelsOnNode", argsClasses, args); Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, NodeLabelsInfo.class); Set<NodeLabel> hashSets = Sets.newHashSet(); nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels())); NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets); @@ -1952,19 +1836,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public AppPriority getAppPriority(HttpServletRequest hsr, String appId) throws AuthorizationException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrGetAppPriorityFailedRetrieved(); - throw e; - } - try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppPriority appPriority = interceptor.getAppPriority(hsr, appId); if (appPriority != null) { long stopTime = clock.getTime(); @@ -1988,14 +1862,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrUpdateAppPriorityFailedRetrieved(); - throw e; - } - if (targetPriority == null) { routerMetrics.incrUpdateAppPriorityFailedRetrieved(); throw new IllegalArgumentException("Parameter error, the targetPriority is empty or null."); @@ -2003,9 +1869,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); Response response = interceptor.updateApplicationPriority(targetPriority, hsr, appId); if (response != null) { long stopTime = clock.getTime(); @@ -2028,19 +1892,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public AppQueue getAppQueue(HttpServletRequest hsr, String appId) throws AuthorizationException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrGetAppQueueFailedRetrieved(); - throw e; - } - try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppQueue queue = interceptor.getAppQueue(hsr, appId); if (queue != null) { long stopTime = clock.getTime(); @@ -2063,14 +1917,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrUpdateAppQueueFailedRetrieved(); - throw e; - } - if (targetQueue == null) { routerMetrics.incrUpdateAppQueueFailedRetrieved(); throw new IllegalArgumentException("Parameter error, the targetQueue is null."); @@ -2078,9 +1924,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); Response response = interceptor.updateAppQueue(targetQueue, hsr, appId); if (response != null) { long stopTime = clock.getTime(); @@ -2197,8 +2041,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { byte[] password = token.getPassword().array(); Text kind = new Text(token.getKind()); Text service = new Text(token.getService()); - Token<RMDelegationTokenIdentifier> tk = new Token<>(identifier, password, kind, service); - return tk; + return new Token<>(identifier, password, kind, service); } /** @@ -2342,9 +2185,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { private Response invokeCreateNewReservation(Map<SubClusterId, SubClusterInfo> subClustersActive, List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount) - throws YarnException, IOException, InterruptedException { - SubClusterId subClusterId = - federationFacade.getRandomActiveSubCluster(subClustersActive, blackList); + throws YarnException { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive, blackList); LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId); SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( @@ -2591,19 +2433,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId, String type) throws AuthorizationException { - if (appId == null || appId.isEmpty()) { - routerMetrics.incrGetAppTimeoutFailedRetrieved(); - throw new IllegalArgumentException("Parameter error, the appId is empty or null."); - } - - // Check that the appId format is accurate - try { - ApplicationId.fromString(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrGetAppTimeoutFailedRetrieved(); - throw e; - } - if (type == null || type.isEmpty()) { routerMetrics.incrGetAppTimeoutFailedRetrieved(); throw new IllegalArgumentException("Parameter error, the type is empty or null."); @@ -2611,9 +2440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppTimeoutInfo appTimeoutInfo = interceptor.getAppTimeout(hsr, appId, type); if (appTimeoutInfo != null) { long stopTime = clock.getTime(); @@ -2636,19 +2463,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId) throws AuthorizationException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrGetAppTimeoutsFailedRetrieved(); - throw e; - } - try { long startTime = clock.getTime(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(hsr, appId); if (appTimeoutsInfo != null) { long stopTime = clock.getTime(); @@ -2673,14 +2490,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrUpdateApplicationTimeoutsRetrieved(); - throw e; - } - if (appTimeout == null) { routerMetrics.incrUpdateApplicationTimeoutsRetrieved(); throw new IllegalArgumentException("Parameter error, the appTimeout is null."); @@ -2688,9 +2497,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = Time.now(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); Response response = interceptor.updateApplicationTimeout(appTimeout, hsr, appId); if (response != null) { long stopTime = clock.getTime(); @@ -2713,19 +2520,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { - // Check that the appId format is accurate - try { - RouterServerUtil.validateApplicationId(appId); - } catch (IllegalArgumentException e) { - routerMetrics.incrAppAttemptsFailedRetrieved(); - throw e; - } - try { long startTime = Time.now(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(hsr, appId); if (appAttemptsInfo != null) { long stopTime = Time.now(); @@ -2768,14 +2565,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Traverse SubCluster and call checkUserAccessToQueue Api try { long startTime = Time.now(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{String.class, String.class, String.class, HttpServletRequest.class}; Object[] args = new Object[]{queue, username, queueAclType, hsrCopy}; ClientMethod remoteMethod = new ClientMethod("checkUserAccessToQueue", argsClasses, args); Map<SubClusterInfo, RMQueueAclInfo> rmQueueAclInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, RMQueueAclInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, RMQueueAclInfo.class); FederationRMQueueAclInfo aclInfo = new FederationRMQueueAclInfo(); rmQueueAclInfoMap.forEach((subClusterInfo, rMQueueAclInfo) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); @@ -2803,7 +2600,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Check that the appId/appAttemptId format is accurate try { - RouterServerUtil.validateApplicationId(appId); RouterServerUtil.validateApplicationAttemptId(appAttemptId); } catch (IllegalArgumentException e) { routerMetrics.incrAppAttemptReportFailedRetrieved(); @@ -2813,9 +2609,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Call the getAppAttempt method try { long startTime = Time.now(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); AppAttemptInfo appAttemptInfo = interceptor.getAppAttempt(req, res, appId, appAttemptId); if (appAttemptInfo != null) { long stopTime = Time.now(); @@ -2853,13 +2647,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); ContainersInfo containersInfo = new ContainersInfo(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{ HttpServletRequest.class, HttpServletResponse.class, String.class, String.class}; Object[] args = new Object[]{req, res, appId, appAttemptId}; ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args); Map<SubClusterInfo, ContainersInfo> containersInfoMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, ContainersInfo.class); + invokeConcurrent(subClustersActive, remoteMethod, ContainersInfo.class); if (containersInfoMap != null && !containersInfoMap.isEmpty()) { containersInfoMap.values().forEach(containers -> containersInfo.addAll(containers.getContainers())); @@ -2895,7 +2689,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Check that the appId/appAttemptId/containerId format is accurate try { - RouterServerUtil.validateApplicationId(appId); RouterServerUtil.validateApplicationAttemptId(appAttemptId); RouterServerUtil.validateContainerId(containerId); } catch (IllegalArgumentException e) { @@ -2905,9 +2698,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = Time.now(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); - DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( - subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorByAppId(appId); ContainerInfo containerInfo = interceptor.getContainer(req, res, appId, appAttemptId, containerId); if (containerInfo != null) { @@ -3006,13 +2797,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { long startTime = clock.getTime(); FederationConfInfo federationConfInfo = new FederationConfInfo(); - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); final HttpServletRequest hsrCopy = clone(hsr); Class[] argsClasses = new Class[]{HttpServletRequest.class}; Object[] args = new Object[]{hsrCopy}; ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args); Map<SubClusterInfo, Response> responseMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + invokeConcurrent(subClustersActive, remoteMethod, Response.class); responseMap.forEach((subClusterInfo, response) -> { SubClusterId subClusterId = subClusterInfo.getSubClusterId(); if (response == null) { @@ -3022,7 +2813,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errorMsg = String.valueOf(response.getEntity()); federationConfInfo.getErrorMsgs().add(errorMsg); } else if (response.getStatus() == Status.OK.getStatusCode()) { - ConfInfo fedConfInfo = ConfInfo.class.cast(response.getEntity()); + ConfInfo fedConfInfo = (ConfInfo) response.getEntity(); fedConfInfo.setSubClusterId(subClusterId.getId()); federationConfInfo.getList().add(fedConfInfo); } @@ -3175,7 +2966,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { */ private SubClusterInfo getHomeSubClusterInfoByAppId(String appId) throws YarnException { - SubClusterInfo subClusterInfo = null; + + if (StringUtils.isBlank(appId)) { + throw new IllegalArgumentException("applicationId can't null or empty."); + } + try { ApplicationId applicationId = ApplicationId.fromString(appId); SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); @@ -3183,8 +2978,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { RouterServerUtil.logAndThrowException(null, "Can't get HomeSubCluster by applicationId %s", applicationId); } - subClusterInfo = federationFacade.getSubCluster(subClusterId); - return subClusterInfo; + return federationFacade.getSubCluster(subClusterId); } catch (IllegalArgumentException e){ throw new IllegalArgumentException(e); } catch (YarnException e) { @@ -3210,8 +3004,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { RouterServerUtil.logAndThrowException(null, "Can't get HomeSubCluster by reservationId %s", resId); } - SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId); - return subClusterInfo; + return federationFacade.getSubCluster(subClusterId); } catch (YarnException | IOException e) { RouterServerUtil.logAndThrowException(e, "Get HomeSubClusterInfo by reservationId %s failed.", resId); @@ -3236,12 +3029,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @VisibleForTesting public Map<SubClusterInfo, NodesInfo> invokeConcurrentGetNodeLabel() throws IOException, YarnException { - Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); + Collection<SubClusterInfo> subClustersActive = federationFacade.getActiveSubClusters(); Class[] argsClasses = new Class[]{String.class}; Object[] args = new Object[]{null}; ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args); - Map<SubClusterInfo, NodesInfo> nodesMap = - invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class); - return nodesMap; + return invokeConcurrent(subClustersActive, remoteMethod, NodesInfo.class); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 7af470dc583..07afc9180ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -111,8 +111,8 @@ public final class RouterWebServiceUtil { * @param formParam the form parameters as input for a specific REST call * @param additionalParam the query parameters as input for a specific REST * call in case the call has no servlet request + * @param conf configuration. * @param client same client used to reduce number of clients created - * @param conf configuration * @return the retrieved entity from the REST call */ protected static <T> T genericForward(final String webApp, @@ -510,6 +510,11 @@ public final class RouterWebServiceUtil { /** * Extract from HttpServletRequest the MediaType in output. + * + * @param request the servlet request. + * @param returnType the return type of the REST call. + * @param <T> Generic Type T. + * @return MediaType. */ protected static <T> String getMediaTypeFromHttpServletRequest( HttpServletRequest request, final Class<T> returnType) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 19bba51e270..5279902b58a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -145,8 +145,6 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY; @@ -170,11 +168,11 @@ import static org.mockito.Mockito.when; * reused to validate different request interceptor chains. */ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { - private static final Logger LOG = - LoggerFactory.getLogger(TestFederationInterceptorREST.class); + private final static int NUM_SUBCLUSTER = 4; private static final int BAD_REQUEST = 400; private static final int ACCEPTED = 202; + private static final String TEST_USER = "test-user"; private static final int OK = 200; private static String user = "test-user"; private TestableFederationInterceptorREST interceptor; @@ -195,7 +193,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { stateStoreUtil = new FederationStateStoreTestUtil(stateStore); interceptor.setConf(this.getConf()); - interceptor.init(user); + interceptor.init(TEST_USER); subClusters = new ArrayList<>(); @@ -282,8 +280,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * ApplicationId has to belong to one of the SubCluster in the cluster. */ @Test - public void testGetNewApplication() - throws YarnException, IOException, InterruptedException { + public void testGetNewApplication() throws IOException, InterruptedException { Response response = interceptor.createNewApplication(null); @@ -359,8 +356,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * request. */ @Test - public void testSubmitApplicationEmptyRequest() - throws YarnException, IOException, InterruptedException { + public void testSubmitApplicationEmptyRequest() throws IOException, InterruptedException { // ApplicationSubmissionContextInfo null Response response = interceptor.submitApplication(null, null); @@ -384,8 +380,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * application in wrong format. */ @Test - public void testSubmitApplicationWrongFormat() - throws YarnException, IOException, InterruptedException { + public void testSubmitApplicationWrongFormat() throws IOException, InterruptedException { ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); @@ -506,8 +501,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * application does not exist in StateStore. */ @Test - public void testGetApplicationNotExists() - throws YarnException, IOException, InterruptedException { + public void testGetApplicationNotExists() { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); @@ -522,8 +516,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * application in wrong format. */ @Test - public void testGetApplicationWrongFormat() - throws YarnException, IOException, InterruptedException { + public void testGetApplicationWrongFormat() { AppInfo response = interceptor.getApp(null, "Application_wrong_id", null); @@ -535,8 +528,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * subcluster provided one application. */ @Test - public void testGetApplicationsReport() - throws YarnException, IOException, InterruptedException { + public void testGetApplicationsReport() { AppsInfo responseGet = interceptor.getApps(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); @@ -645,8 +637,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { * application does not exist in StateStore. */ @Test - public void testGetApplicationStateNotExists() - throws YarnException, IOException, InterruptedException { + public void testGetApplicationStateNotExists() throws IOException { ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -662,7 +653,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { */ @Test public void testGetApplicationStateWrongFormat() - throws YarnException, IOException, InterruptedException { + throws IOException { AppState response = interceptor.getAppState(null, "Application_wrong_id"); @@ -865,8 +856,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppAttempts() - throws IOException, InterruptedException, YarnException { + public void testGetAppAttempts() throws IOException, InterruptedException { // Submit application to multiSubCluster ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); @@ -897,8 +887,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppAttempt() - throws IOException, InterruptedException, YarnException { + public void testGetAppAttempt() throws IOException, InterruptedException { // Generate ApplicationId information ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -922,7 +911,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppTimeout() throws IOException, InterruptedException, YarnException { + public void testGetAppTimeout() throws IOException, InterruptedException { // Generate ApplicationId information ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -942,7 +931,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppTimeouts() throws IOException, InterruptedException, YarnException { + public void testGetAppTimeouts() throws IOException, InterruptedException { // Generate ApplicationId information ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -1022,8 +1011,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppPriority() throws IOException, InterruptedException, - YarnException { + public void testGetAppPriority() throws IOException, InterruptedException { // Submit application to multiSubCluster ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -1072,7 +1060,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppQueue() throws IOException, InterruptedException, YarnException { + public void testGetAppQueue() throws IOException, InterruptedException { String queueName = "queueName"; // Submit application to multiSubCluster @@ -1090,7 +1078,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { } @Test - public void testGetAppsInfoCache() throws IOException, InterruptedException, YarnException { + public void testGetAppsInfoCache() { AppsInfo responseGet = interceptor.getApps( null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); @@ -1102,7 +1090,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appsInfoCache = interceptor.getAppInfosCaches(); Assert.assertNotNull(appsInfoCache); - Assert.assertTrue(!appsInfoCache.isEmpty()); + Assert.assertFalse(appsInfoCache.isEmpty()); Assert.assertEquals(1, appsInfoCache.size()); Assert.assertTrue(appsInfoCache.containsKey(cacheKey)); @@ -1113,7 +1101,6 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { @Test public void testGetAppStatistics() throws IOException, InterruptedException, YarnException { - AppState appStateRUNNING = new AppState(YarnApplicationState.RUNNING.name()); // Submit application to multiSubCluster ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); @@ -1200,6 +1187,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertNotNull(entity); Assert.assertNotNull(entity instanceof ReservationListInfo); + Assert.assertTrue(entity instanceof ReservationListInfo); ReservationListInfo listInfo = (ReservationListInfo) entity; Assert.assertNotNull(listInfo); @@ -1267,6 +1255,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertNotNull(entity); Assert.assertNotNull(entity instanceof ReservationListInfo); + Assert.assertTrue(entity instanceof ReservationListInfo); ReservationListInfo listInfo = (ReservationListInfo) entity; Assert.assertNotNull(listInfo); @@ -1310,6 +1299,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertNotNull(entity); Assert.assertNotNull(entity instanceof ReservationListInfo); + Assert.assertTrue(entity instanceof ReservationListInfo); ReservationListInfo listInfo = (ReservationListInfo) entity; Assert.assertNotNull(listInfo); @@ -1373,8 +1363,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { throws IOException, InterruptedException { ReservationSubmissionRequestInfo resSubmissionRequestInfo = getReservationSubmissionRequestInfo(reservationId); - Response response = interceptor.submitReservation(resSubmissionRequestInfo, null); - return response; + return interceptor.submitReservation(resSubmissionRequestInfo, null); } public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo( @@ -1402,15 +1391,13 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { long arrival = Time.now(); // deadline by when the resource(s) must be allocated. - // The reason for choosing 1.05 is because this gives an integer + // The reason for choosing 1.05 is that this gives an integer // DURATION * 0.05 = 3000(ms) // deadline = arrival + 3000ms long deadline = (long) (arrival + 1.05 * DURATION); - ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest( + return createSimpleReservationRequest( reservationId, numContainers, arrival, deadline, DURATION, memory, vcore); - - return submissionRequest; } public static ReservationSubmissionRequest createSimpleReservationRequest( @@ -1423,9 +1410,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); ReservationDefinition rDef = ReservationDefinition.newInstance( arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED); - ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( - rDef, QUEUE_DEDICATED_FULL, reservationId); - return request; + return ReservationSubmissionRequest.newInstance(rDef, QUEUE_DEDICATED_FULL, reservationId); } @Test @@ -1497,7 +1482,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr); Assert.assertNotNull(aclInfo); Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo); - FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo); + FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo; List<RMQueueAclInfo> aclInfos = fedAclInfo.getList(); Assert.assertNotNull(aclInfos); Assert.assertEquals(4, aclInfos.size()); @@ -1513,7 +1498,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr); Assert.assertNotNull(aclInfo); Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo); - FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo); + FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo; List<RMQueueAclInfo> aclInfos = fedAclInfo.getList(); Assert.assertNotNull(aclInfos); Assert.assertEquals(4, aclInfos.size()); @@ -1589,13 +1574,12 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertTrue(typeInfo instanceof FederationSchedulerTypeInfo); FederationSchedulerTypeInfo federationSchedulerTypeInfo = - FederationSchedulerTypeInfo.class.cast(typeInfo); + (FederationSchedulerTypeInfo) typeInfo; Assert.assertNotNull(federationSchedulerTypeInfo); List<SchedulerTypeInfo> schedulerTypeInfos = federationSchedulerTypeInfo.getList(); Assert.assertNotNull(schedulerTypeInfos); Assert.assertEquals(4, schedulerTypeInfos.size()); - List<String> subClusterIds = - subClusters.stream().map(subClusterId -> subClusterId.getId()). + List<String> subClusterIds = subClusters.stream().map(SubClusterId::getId). collect(Collectors.toList()); for (SchedulerTypeInfo schedulerTypeInfo : schedulerTypeInfos) { @@ -1609,8 +1593,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { SchedulerInfo schedulerInfo = schedulerTypeInfo.getSchedulerInfo(); Assert.assertNotNull(schedulerInfo); Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo); - CapacitySchedulerInfo capacitySchedulerInfo = - CapacitySchedulerInfo.class.cast(schedulerInfo); + CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo; Assert.assertNotNull(capacitySchedulerInfo); // 3. The parent queue name should be root @@ -1702,7 +1685,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertNotNull(entity); Assert.assertTrue(entity instanceof DelegationToken); - DelegationToken dtoken = DelegationToken.class.cast(entity); + DelegationToken dtoken = (DelegationToken) entity; Assert.assertEquals(TEST_RENEWER, dtoken.getRenewer()); Assert.assertEquals(TEST_RENEWER, dtoken.getOwner()); Assert.assertEquals("RM_DELEGATION_TOKEN", dtoken.getKind()); @@ -1751,7 +1734,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Object entity = response.getEntity(); Assert.assertNotNull(entity); Assert.assertTrue(entity instanceof DelegationToken); - DelegationToken dtoken = DelegationToken.class.cast(entity); + DelegationToken dtoken = (DelegationToken) entity; final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token"; when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken()); @@ -1764,7 +1747,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertTrue(renewEntity instanceof DelegationToken); // renewDelegation, we only return renewDate, other values are NULL. - DelegationToken renewDToken = DelegationToken.class.cast(renewEntity); + DelegationToken renewDToken = (DelegationToken) renewEntity; Assert.assertNull(renewDToken.getRenewer()); Assert.assertNull(renewDToken.getOwner()); Assert.assertNull(renewDToken.getKind()); @@ -1789,7 +1772,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Object entity = response.getEntity(); Assert.assertNotNull(entity); Assert.assertTrue(entity instanceof DelegationToken); - DelegationToken dtoken = DelegationToken.class.cast(entity); + DelegationToken dtoken = (DelegationToken) entity; final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token"; when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken()); @@ -1903,7 +1886,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { // We cannot guarantee the calling order of the sub-clusters, // We guarantee that the returned result contains the information of each subCluster. Assert.assertNotNull(dumpSchedulerLogsMsg); - subClusters.stream().forEach(subClusterId -> { + subClusters.forEach(subClusterId -> { String subClusterMsg = "subClusterId" + subClusterId + " : Capacity scheduler logs are being created.; "; Assert.assertTrue(dumpSchedulerLogsMsg.contains(subClusterMsg)); @@ -1978,7 +1961,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { Assert.assertTrue(bulkActivitiesInfo instanceof FederationBulkActivitiesInfo); FederationBulkActivitiesInfo federationBulkActivitiesInfo = - FederationBulkActivitiesInfo.class.cast(bulkActivitiesInfo); + (FederationBulkActivitiesInfo) bulkActivitiesInfo; Assert.assertNotNull(federationBulkActivitiesInfo); List<BulkActivitiesInfo> activitiesInfos = federationBulkActivitiesInfo.getList(); @@ -2033,9 +2016,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { // we confirm the result by contains String expectedMsg = "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS"; - Arrays.stream(entities).forEach(item -> { - Assert.assertTrue(expectedMsg.contains(item)); - }); + Arrays.stream(entities).forEach(item -> Assert.assertTrue(expectedMsg.contains(item))); } @Test @@ -2098,9 +2079,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { // we confirm the result by contains String expectedMsg = "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS"; - Arrays.stream(entities).forEach(item -> { - Assert.assertTrue(expectedMsg.contains(item)); - }); + Arrays.stream(entities).forEach(item -> Assert.assertTrue(expectedMsg.contains(item))); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org