This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 6be04633b55 YARN-11711. Clean Up ServiceScheduler Code. (#6977) Contributed by Shilun Fan. 6be04633b55 is described below commit 6be04633b55bbd67c2875e39977cd9d2308dc1d1 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Thu Aug 22 17:49:42 2024 +0800 YARN-11711. Clean Up ServiceScheduler Code. (#6977) Contributed by Shilun Fan. Reviewed-by: Steve Loughran <ste...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../hadoop-yarn/dev-support/findbugs-exclude.xml | 9 ++ .../hadoop/yarn/service/ServiceScheduler.java | 106 ++++++++++----------- 2 files changed, 58 insertions(+), 57 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 309c0285800..bad2bacb1a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -725,4 +725,13 @@ <Match> <Package name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema" /> </Match> + + <!-- The ServiceScheduler#createConfigFileCache method uses the `load` method, + which is not allowed to return null; we can ignore it here. --> + <Match> + <Class name="org.apache.hadoop.yarn.service.ServiceScheduler"/> + <Method name="$1.load(ConfigFile)" /> + <Bug pattern="NP_NONNULL_RETURN_VIOLATION"/> + </Match> + </FindBugsFilter> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 9da8f31fe4b..b95e2c31849 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -123,6 +123,8 @@ import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_SUCCESS; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTPS_PREFIX; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTP_PREFIX; /** * @@ -153,10 +155,10 @@ public class ServiceScheduler extends CompositeService { private boolean timelineServiceEnabled; - // Global diagnostics that will be reported to RM on eRxit. + // Global diagnostics that will be reported to RM on exit. // The unit the number of characters. This will be limited to 64 * 1024 // characters. - private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); + private final BoundedAppender diagnostics = new BoundedAppender(64 * 1024); // A cache for loading config files from remote such as hdfs public LoadingCache<ConfigFile, Object> configFileCache = null; @@ -168,7 +170,7 @@ public class ServiceScheduler extends CompositeService { private NMClientAsync nmClient; private AsyncDispatcher dispatcher; private YarnRegistryViewForProviders yarnRegistryOperations; - private ServiceContext context; + private final ServiceContext context; private ContainerLaunchService containerLaunchService; private final Map<ContainerId, ComponentInstance> unRecoveredInstances = new ConcurrentHashMap<>(); @@ -185,10 +187,10 @@ public class ServiceScheduler extends CompositeService { private volatile FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.ENDED; - private Clock systemClock; + private final Clock systemClock; // For unit test override since we don't want to terminate UT process. - private ServiceUtils.ProcessTerminationHandler + private final ServiceUtils.ProcessTerminationHandler terminationHandler = new ServiceUtils.ProcessTerminationHandler(); public ServiceScheduler(ServiceContext context) { @@ -199,10 +201,10 @@ public class ServiceScheduler extends CompositeService { } public void buildInstance(ServiceContext context, Configuration configuration) - throws YarnException, IOException { + throws YarnException { app = context.service; executorService = Executors.newScheduledThreadPool(10); - RegistryOperations registryClient = null; + RegistryOperations registryClient; if (UserGroupInformation.isSecurityEnabled() && !StringUtils.isEmpty(context.principal) && !StringUtils.isEmpty(context.keytab)) { @@ -480,7 +482,7 @@ public class ServiceScheduler extends CompositeService { } }); - if (unRecoveredInstances.size() > 0) { + if (!unRecoveredInstances.isEmpty()) { executorService.schedule(() -> { synchronized (unRecoveredInstances) { // after containerRecoveryTimeout, all the containers that haven't be @@ -532,7 +534,8 @@ public class ServiceScheduler extends CompositeService { this.configFileCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) .build(new CacheLoader<ConfigFile, Object>() { - @Override public Object load(ConfigFile key) throws Exception { + @Override + public Object load(ConfigFile key) throws Exception { switch (key.getType()) { case HADOOP_XML: try (FSDataInputStream input = fileSystem @@ -560,9 +563,8 @@ public class ServiceScheduler extends CompositeService { } private void registerServiceInstance(ApplicationAttemptId attemptId, - Service service) throws IOException { - LOG.info("Registering " + attemptId + ", " + service.getName() - + " into registry"); + Service service) { + LOG.info("Registering {}, {} into registry.", attemptId, service.getName()); ServiceRecord serviceRecord = new ServiceRecord(); serviceRecord.set(YarnRegistryAttributes.YARN_ID, attemptId.getApplicationId().toString()); @@ -570,24 +572,21 @@ public class ServiceScheduler extends CompositeService { PersistencePolicies.APPLICATION); serviceRecord.description = "YarnServiceMaster"; - executorService.submit(new Runnable() { - @Override public void run() { - try { - yarnRegistryOperations.registerSelf(serviceRecord, false); - LOG.info("Registered service under {}; absolute path {}", - yarnRegistryOperations.getSelfRegistrationPath(), - yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); - boolean isFirstAttempt = 1 == attemptId.getAttemptId(); - // delete the children in case there are any and this is an AM startup. - // just to make sure everything underneath is purged - if (isFirstAttempt) { - yarnRegistryOperations.deleteChildren( - yarnRegistryOperations.getSelfRegistrationPath(), true); - } - } catch (IOException e) { - LOG.error( - "Failed to register app " + app.getName() + " in registry", e); + executorService.submit(() -> { + try { + yarnRegistryOperations.registerSelf(serviceRecord, false); + LOG.info("Registered service under {}; absolute path {}", + yarnRegistryOperations.getSelfRegistrationPath(), + yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); + boolean isFirstAttempt = 1 == attemptId.getAttemptId(); + // delete the children in case there are any and this is an AM startup. + // just to make sure everything underneath is purged + if (isFirstAttempt) { + yarnRegistryOperations.deleteChildren( + yarnRegistryOperations.getSelfRegistrationPath(), true); } + } catch (IOException e) { + LOG.error("Failed to register app {} in registry.", app.getName(), e); } }); if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { @@ -637,7 +636,7 @@ public class ServiceScheduler extends CompositeService { Component component = componentsByName.get(event.getName()); if (component == null) { - LOG.error("No component exists for " + event.getName()); + LOG.error("No component exists for {}.", event.getName()); return; } try { @@ -657,14 +656,14 @@ public class ServiceScheduler extends CompositeService { ComponentInstance instance = liveInstances.get(event.getContainerId()); if (instance == null) { - LOG.error("No component instance exists for " + event.getContainerId()); + LOG.error("No component instance exists for {}.", event.getContainerId()); return; } try { instance.handle(event); } catch (Throwable t) { - LOG.error(instance.getCompInstanceId() + - ": Error in handling event type " + event.getType(), t); + LOG.error("{} : Error in handling event type {}.", + instance.getCompInstanceId(), event.getType(), t); } } } @@ -673,7 +672,7 @@ public class ServiceScheduler extends CompositeService { @Override public void onContainersAllocated(List<Container> containers) { - LOG.info(containers.size() + " containers allocated. "); + LOG.info("{} containers allocated. ", containers.size()); for (Container container : containers) { Component comp = componentsById.get(container.getAllocationRequestId()); ComponentEvent event = @@ -684,8 +683,8 @@ public class ServiceScheduler extends CompositeService { Collection<AMRMClient.ContainerRequest> requests = amRMClient .getMatchingRequests(container.getAllocationRequestId()); LOG.info("[COMPONENT {}]: remove {} outstanding container requests " + - "for allocateId " + container.getAllocationRequestId(), - comp.getName(), requests.size()); + "for allocateId {}.", comp.getName(), requests.size(), + container.getAllocationRequestId()); // remove the corresponding request if (requests.iterator().hasNext()) { AMRMClient.ContainerRequest request = requests.iterator().next(); @@ -799,7 +798,7 @@ public class ServiceScheduler extends CompositeService { Map<String, ByteBuffer> allServiceResponse) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + LOG.error("No component instance exists for {}.", containerId); return; } ComponentEvent event = @@ -821,10 +820,10 @@ public class ServiceScheduler extends CompositeService { public void onStartContainerError(ContainerId containerId, Throwable t) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + LOG.error("No component instance exists for {}.", containerId); return; } - LOG.error("Failed to start " + containerId, t); + LOG.error("Failed to start {}.", containerId, t); amRMClient.releaseAssignedContainer(containerId); // After container released, it'll get CONTAINER_COMPLETED event from RM // automatically which will trigger stopping COMPONENT INSTANCE @@ -950,15 +949,14 @@ public class ServiceScheduler extends CompositeService { } public boolean terminateServiceIfNeeded(Component component) { - boolean serviceIsTerminated = + return terminateServiceIfDominantComponentFinished(component) || terminateServiceIfAllComponentsFinished(); - return serviceIsTerminated; } /** * If the service state component is finished, the service is also terminated. - * @param component + * @param component service component. */ private boolean terminateServiceIfDominantComponentFinished(Component component) { @@ -981,8 +979,7 @@ public class ServiceScheduler extends CompositeService { state); component.getComponentSpec().setState(state); LOG.info("Dominate component {} finished, exiting Service Master... " + - ", final status=" + (isSucceeded ? "Succeeded" : "Failed"), - component.getName()); + ", final status={}.", component.getName(), (isSucceeded ? "Succeeded" : "Failed")); terminateService(isSucceeded); } } @@ -1042,14 +1039,10 @@ public class ServiceScheduler extends CompositeService { } if (shouldTerminate) { - LOG.info("All component finished, exiting Service Master... " - + ", final status=" + (failedComponents.isEmpty() ? - "Succeeded" : - "Failed")); - LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils - .join(succeededComponents, ",") + "]"); - LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils - .join(failedComponents, ",") + "]"); + LOG.info("All component finished, exiting Service Master... " + + ", final status={}", (failedComponents.isEmpty() ? "Succeeded" : "Failed")); + LOG.info("Succeeded components: [" + StringUtils.join(succeededComponents, ",") + "]"); + LOG.info("Failed components: [" + StringUtils.join(failedComponents, ",") + "]"); terminateService(failedComponents.isEmpty()); } @@ -1093,7 +1086,7 @@ public class ServiceScheduler extends CompositeService { spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp); for (org.apache.hadoop.yarn.service.api.records.Component c : yarnApp.getComponents()) { - Set<String> nodes = new HashSet<String>(); + Set<String> nodes = new HashSet<>(); boolean update = Boolean.parseBoolean(c.getConfiguration() .getEnv(ApplicationConstants.Environment .YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name())); @@ -1109,9 +1102,9 @@ public class ServiceScheduler extends CompositeService { for (String bareHost : nodes) { StringBuilder requestPath = new StringBuilder(); if (YarnConfiguration.useHttps(conf)) { - requestPath.append("https://"); + requestPath.append(HTTPS_PREFIX); } else { - requestPath.append("http://"); + requestPath.append(HTTP_PREFIX); } requestPath.append(bareHost) .append(":") @@ -1129,8 +1122,7 @@ public class ServiceScheduler extends CompositeService { Builder builder = HttpUtil.connect(requestPath.toString()); ClientResponse response = builder.put(ClientResponse.class, spec); if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) { - LOG.warn("Error synchronize YARN sysfs: " + - response.getEntity(String.class)); + LOG.warn("Error synchronize YARN sysfs: {}.", response.getEntity(String.class)); success = false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org